从Postgres使用jOOQ获取的流未返回类的结果 [英] Stream fetched from Postgres with jOOQ not returning results from class

查看:91
本文介绍了从Postgres使用jOOQ获取的流未返回类的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试从 postgres 结果c $ c>查询前端应用程序,而不是急于获取所有结果。问题是我只能在终端中看到流式结果(即首先在 org.jooq.tools.LoggerListener中:获取记录:... ,然后使用 stream.get()。forEach(s-> debug)),引用该流的类仅产生 null 值在前端查看 ResultSet 时。

I am attempting to stream results from a postgres query to a frontend application, rather than eagerly fetching all results. The problem is that I can only see streamed results only in my terminal (i.e. first in "org.jooq.tools.LoggerListener : Record fetched: ..." and then with a stream.get().forEach(s -> debug)), and the class which references this stream only produces null values when called upon to view the ResultSet in the frontend.

此数据也可以用于其他任务(例如可视化,下载/导出,摘要统计信息等)。我一直在浏览有关 jOOQ 的文档和帖子,我正在将它们用作我的ORM,并且尝试使用以下方法:

This data may be used for other tasks as well (e.g. visualization, downloading / exporting, summary statistics, etc.). I have been looking through the documentation and posts about jOOQ, which I am using as my ORM, and there are the following methods which I am trying to use :

  • DefaultDSLContext.fetchStream()
  • ResultQuery.stream()
  • ResultQuery.fetchStream()

现在,使用下面的方法进行完美的抓取是很理想的,但这将返回一个巨型 ResponseEntity code>并且不会流式传输结果:

Eagerly fetching with the following works perfectly for now, but this will return everything in one giant ResponseEntity and won't stream results :

  • ResultQuery.fetchMaps()

DataController.java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult.java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

QueryService.java

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Query.java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi.js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

Data.jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

控制台中的返回结果

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...






堆栈:




  • Docker :19.03.5

  • Spring Boot :v2.1.8.RELEASE

  • 节点:v12.13.1

  • 反应:16.9.0

  • OpenJDK :12.0.2

  • jOOQ :3.12.3

  • postgres :10.7


  • Stack :

    • Docker : 19.03.5
    • Spring Boot : v2.1.8.RELEASE
    • Node : v12.13.1
    • React : 16.9.0
    • OpenJDK : 12.0.2
    • jOOQ : 3.12.3
    • postgres : 10.7
    • 推荐答案

      Java Stream API最多可以使用一次这样的流。它没有任何缓冲功能,也不像被动流实现那样支持基于推的流模型。

      The whole point of the Java Stream API is for such a stream to be consumed at most once. It does not have any buffering feature, nor does it support a push based streaming model like reactive stream implementations do.

      您可以在堆栈中添加另一个API,例如 Reactor (还有其他人,但由于您已经在使用Spring ...),它支持缓冲和重播流给多个使用者,但是与jOOQ直接无关,并且会严重影响应用程序的体系结构。

      You could add another API to your stack, such as e.g. Reactor (there are others, but since you're already using Spring...), which supports buffering and replaying streams to several consumers, but that has nothing to do with jOOQ directly and will heavily influence your application's architecture.

      注意jOOQ的 ResultQuery 扩展了 org.reactivestreams.Publisher 和JDK 9的 Flow.Publisher ,以提高与此类反应流的互操作性。

      Notice that jOOQ's ResultQuery extends org.reactivestreams.Publisher and JDK 9's Flow.Publisher for better interoperability with such reactive streams.

      这篇关于从Postgres使用jOOQ获取的流未返回类的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆