从Postgres使用jOOQ获取的流未返回类的结果 [英] Stream fetched from Postgres with jOOQ not returning results from class
问题描述
我正尝试从 postgres $进行
流
结果c $ c>查询前端应用程序,而不是急于获取所有结果。问题是我只能在终端中看到流式结果(即首先在 org.jooq.tools.LoggerListener中:获取记录:...
,然后使用 stream.get()。forEach(s-> debug)
),引用该流的类仅产生 null $ c调用$ c>值在前端查看
ResultSet
时。
I am attempting to stream
results from a postgres
query to a frontend application, rather than eagerly fetching all results. The problem is that I can only see streamed results only in my terminal (i.e. first in "org.jooq.tools.LoggerListener : Record fetched: ..."
and then with a stream.get().forEach(s -> debug)
), and the class which references this stream only produces null
values when called upon to view the ResultSet
in the frontend.
此数据也可以用于其他任务(例如可视化,下载/导出,摘要统计信息等)。我一直在浏览有关 jOOQ
的文档和帖子,我正在将它们用作我的ORM,并且尝试使用以下方法:
This data may be used for other tasks as well (e.g. visualization, downloading / exporting, summary statistics, etc.). I have been looking through the documentation and posts about jOOQ
, which I am using as my ORM, and there are the following methods which I am trying to use :
- DefaultDSLContext.fetchStream()
- ResultQuery.stream()
- < a href = https://www.jooq.org/javadoc/latest/org.jooq/org/jooq/ResultQuery.html#fetchStream() rel = nofollow noreferrer> ResultQuery.fetchStream()
- DefaultDSLContext.fetchStream()
- ResultQuery.stream()
- ResultQuery.fetchStream()
现在,使用下面的方法进行完美的抓取是很理想的,但这将返回一个巨型 ResponseEntity code>并且不会流式传输结果:
Eagerly fetching with the following works perfectly for now, but this will return everything in one giant ResponseEntity
and won't stream results :
- ResultQuery.fetchMaps()
DataController.java
@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {
@Autowired private QueryService queryService;
@PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ApiOperation(value = "Query the data")
@ResponseStatus(HttpStatus.CREATED)
public ResponseEntity<QueryResult> getQueryResults(
@RequestBody @ValidQuery Query query, HttpServletRequest request) {
QueryResult res = queryService.search(query);
return ResponseEntity.ok(res);
}
// ...
}
QueryResult.java
public QueryResult(Stream<Record> result) {
this.result = result;
}
// public List<Map<String, Object>> getResult() { return result; }
@JsonProperty("result")
public Stream<Record> getResult() { return result; }
// public void setResult(List<Map<String, Object>> result) { this.result = result; }
public void setResult(Stream<Record> result) { this.result = result; }
}
QueryService.java
@Service
public class QueryService implements SearchService{
@Autowired DefaultDSLContext dslContext;
public QueryResult search(Query query) {
LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();
// Build selected fields
List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);
// Current support is for a single query. All others passed will be ignored
List<Filter> filters = query.getFilters();
Filter leadingFilter = QueryUtils.getLeadingFilter(filters);
// Build "where" conditions
Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);
// Get "from" statement
Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());
/*
// Works fine, but is not lazy fetching
List<Map<String, Object>> results =
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchMaps();
*/
// Appears to work only once.
// Cannot see any results returned, but the number of records is correct.
// Everything in the records is null / undefined in the frontend
Supplier<Stream<Record>> results = () ->
dslContext
.select(selectFields)
.from(fromClause)
.where(conditionClause)
.limit(query.getOffset(), query.getLimit())
.fetchStream();
// "stream has already been operated upon or closed" is returned when using a Supplier
results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));
return new QueryResult(results.get());
}
}
Query.java
public class Query {
@NotNull(message = "Query must contain selection(s)")
private LinkedHashMap<DataSourceName, List<String>> selections;
private List<Filter> filters;
private List<Join> joins;
private List<Sort> sorts;
private long offset;
private int limit;
private QueryOptions options;
@JsonProperty("selections")
public LinkedHashMap<DataSourceName, List<String>> getSelections() {
return selections;
}
public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
this.selections = selections;
}
@JsonProperty("filters")
public List<Filter> getFilters() {
return filters;
}
public void setFilters(List<Filter> filters) {
this.filters = filters;
}
@JsonProperty("joins")
public List<Join> getJoins() {
return joins;
}
public void setJoins(List<Join> joins) {
this.joins = joins;
}
@JsonProperty("sorts")
public List<Sort> getSorts() {
return sorts;
}
public void setSorts(List<Sort> sorts) {
this.sorts = sorts;
}
@JsonProperty("options")
public QueryOptions getOptions() {
return options;
}
public void setOptions(QueryOptions options) {
this.options = options;
}
@JsonProperty("offset")
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
@JsonProperty("limit")
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
@Override
public String toString() {
return "Query{"
+ "selections=" + selections
+ ", filters=" + filters
+ ", sorts=" + sorts
+ ", offSet=" + offset
+ ", limit=" + limit
+ ", options=" + options
+ '}';
}
}
DataApi.js
// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;
Data.jsx
// ...
// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...
控制台中的返回结果
{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...
堆栈:
- Docker :19.03.5
- Spring Boot :v2.1.8.RELEASE
- 节点:v12.13.1
- 反应:16.9.0
- OpenJDK :12.0.2
- jOOQ :3.12.3
- postgres :10.7
- Docker : 19.03.5
- Spring Boot : v2.1.8.RELEASE
- Node : v12.13.1
- React : 16.9.0
- OpenJDK : 12.0.2
- jOOQ : 3.12.3
- postgres : 10.7
Stack :
推荐答案
Java Stream
API最多可以使用一次这样的流。它没有任何缓冲功能,也不像被动流实现那样支持基于推的流模型。
The whole point of the Java Stream
API is for such a stream to be consumed at most once. It does not have any buffering feature, nor does it support a push based streaming model like reactive stream implementations do.
您可以在堆栈中添加另一个API,例如 Reactor
(还有其他人,但由于您已经在使用Spring ...),它支持缓冲和重播流给多个使用者,但是与jOOQ直接无关,并且会严重影响应用程序的体系结构。
You could add another API to your stack, such as e.g. Reactor
(there are others, but since you're already using Spring...), which supports buffering and replaying streams to several consumers, but that has nothing to do with jOOQ directly and will heavily influence your application's architecture.
注意jOOQ的 ResultQuery
扩展了 org.reactivestreams.Publisher
和JDK 9的 Flow.Publisher
,以提高与此类反应流的互操作性。
Notice that jOOQ's ResultQuery
extends org.reactivestreams.Publisher
and JDK 9's Flow.Publisher
for better interoperability with such reactive streams.
这篇关于从Postgres使用jOOQ获取的流未返回类的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!