从BigQuery读取并将数据存储到Google存储设备(特殊字符问题) [英] Reading from BigQuery and store data to Google storage (Special Character issue)

查看:67
本文介绍了从BigQuery读取并将数据存储到Google存储设备(特殊字符问题)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

参考: Google数据流可以使用现有的VM,而不使用临时创建的VM吗?

代码正在运行,但是问题在于,当它将BigQuery的响应保存到Google存储时,所有日语字符都已损坏.

Code is working, but the issue is that when it saves response from BigQuery to google storage all the Japanese characters are corrupted.

PCollectionTuple QVCollections = rows.apply("FilterEmptyRows", ParDo.of(new FilterEmptyRowDoFn("TransactionId", "TransactionDateTime"))).apply("CreateQVFiles",ParDo.of(new TransactionToQVFilesDoFnJP())
        .withOutputTags(BobShare.QVHeaders, TupleTagList.of(BobShare.QVEvents).and(BobShare.QVPayments)));

QVCollections.get(BobShare.QVEvents).apply("WriteQVEvents", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "events_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_EVENTS).withSuffix(".csv"));
QVCollections.get(BobShare.QVPayments).apply("WriteQVPayments", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "payments_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_PAYMENTS).withSuffix(".csv"));
QVCollections.get(BobShare.QVHeaders).apply("WriteQVHeaders", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "header_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_TRANSACTION).withSuffix(".csv"));

根据我的发现,需要使用.withCoder(StringUtf8Coder.of())

Based on what I have found, need to use .withCoder(StringUtf8Coder.of())

此外,这是尝试过的方法(但只能在本地使用-DirectRunner)

In addition, this is what have tried (but working only locally - DirectRunner)

private static void uploadBlob(String project, String bucket, String filename, String localfile) {
    String listFromCsv = readCsvFromLocalStorage(localfile);

    Storage storage = StorageOptions.newBuilder().setProjectId(project).build().getService();
    BlobId blobId = BlobId.of(bucket, filename);
    BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("application/json").setContentEncoding(UTF_8).build();
    try {
        storage.create(blobInfo, listFromCsv.getBytes(UTF_8));
    } catch (UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}


private static String readCsvFromLocalStorage(String fileName) {
    StringBuilder builder = new StringBuilder();
    Path pathToFile = Paths.get(fileName);

    try (BufferedReader br = Files.newBufferedReader(pathToFile,
            StandardCharsets.UTF_8)) {

        // read the first line from the text file
        String line = br.readLine();

        // loop until all lines are read
        while (line != null) {
            builder.append(line).append("\n");
            line = br.readLine();
        }

    } catch (IOException ioe) {
        ioe.printStackTrace();
    }

    return builder.toString();
}

private static void deleteLocalFile (String fileName)
{
    try {
        if (new File(fileName).delete()) {
            System.out.println(fileName + " deleted.");
        } else {
            System.out.println(fileName + " could not be deleted.");
        }
    } catch (Exception e)
    {
        System.out.println(fileName + " could not be deleted.");
        e.printStackTrace();
    }
}  

这是数据的样子(损坏): 日语字符

This is how data looks like (corrupted) : JAPANESE CHRACTERS

有什么建议吗?任何....((((

Any suggestions? Any .... (((

推荐答案

您需要替换

BufferedReader br = Files.newBufferedReader(pathToFile, StandardCharsets.UTF_8))

BufferedReader br = Files.newBufferedReader(pathToFile, StandardCharsets.UTF_8))

作者

BufferedReader br = Files.newBufferedReader(pathToFile, Charset.forName("UTF-8"))

BufferedReader br = Files.newBufferedReader(pathToFile, Charset.forName("UTF-8"))

这篇关于从BigQuery读取并将数据存储到Google存储设备(特殊字符问题)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆