每天在Java 8 Stream API中对实体进行延迟排序? [英] Lazy sorting of entities in Java 8 Stream API on a daily basis?
问题描述
我有一个大型Java 8 Stream( Stream< MyObject>
),其中的对象如下所示:
I have a large Java 8 Stream (Stream<MyObject>
) with objects that looks like this:
class MyObject {
private String string;
private Date timestamp;
// Getters and setter removed from brevity
}
我知道第1天的所有时间戳都会在第2天之前到达,但在每一天内,时间戳可能会出现故障。我想使用Stream API每天在 timestamp
订单中对 MyObject
进行排序。由于Stream很大,我必须尽可能地懒得这样做,即可以在内存中保存一天价值 MyObject
,但它会不能可以保留更多。
I know that all timestamps for day 1 will arrive before those in day 2 but within each day the timestamps could be out of order. I'd like to sort the MyObject
's in timestamp
order on a per daily basis using the Stream API. Since the Stream is large I have to do this as lazily as possible, i.e. it would be OK to hold one days worth of MyObject
's in memory but it would not be OK to hold much more than that.
我如何实现这一目标?
更新2017-04-29 :
要求是我想在排序后继续处理同一个流!我喜欢这样的东西(伪代码):
A requirement is that I want to continue working on the same stream after the sorting! I'd like something like this (pseudo code):
Stream<MyObject> sortedStream = myStreamUnsorted().sort(onADailyBasis());
推荐答案
这取决于您是否需要处理所有对象天或某一天。
It depends whether you need to process the objects of all days or one specific day.
以DiabolicWords的答案为基础,这是一个处理所有日子的例子:
Building on DiabolicWords's answer, this is an example to process all days:
TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate[] currentDay = new LocalDate[1];
incoming.peek(o -> {
LocalDate date = o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
if (!date.equals(currentDay[0]))
{
if (currentDay != null)
{
processOneDaysObjects(currentDaysObjects);
currentDaysObjects.clear();
}
currentDay[0] = date;
}
}).forEach(currentDaysObjects::add);
这将收集对象一天,处理它们,重置收集并继续第二天。
This will collect the objects for one day, process them, reset the collection and continue with the next day.
如果您只想要一个特定日期:
If you only want one specific day:
TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate specificDay = LocalDate.now();
incoming.filter(o -> !o.getTimestamp()
.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDate()
.isBefore(specificDay))
.peek(o -> currentDaysObjects.add(o))
.anyMatch(o -> {
if (o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().isAfter(specificDay))
{
currentDaysObjects.remove(o);
return true;
}
return false;
});
过滤器将跳过 specificDay
,并且anyMatch将在 specificDay
之后终止流。
The filter will skip objects from before the specificDay
, and the anyMatch will terminate the stream after the specificDay
.
我已经读过会有类似的方法skipWhile或takeWhile on Java 9。这将使这更容易。
I have read that there will be methods like skipWhile or takeWhile on streams with Java 9. These would make this a lot easier.
在Op指定目标后更详细地编辑
哇,这是一个很好的练习,而且非常难以破解。问题是一个明显的解决方案(收集流)总是遍历整个流。您不能接受下一个x元素,对它们进行排序,流式传输,然后重复,而不是一次性完成整个流(即所有日期)。出于同样的原因,在流上调用 sorted()
将完全通过它(特别是因为流不知道元素已经按天排序的事实)。作为参考,请在此处阅读此评论: https://stackoverflow.com/a/27595803/7653073 。
Wow, this is a nice exercise, and quite a tough nut to crack. The problem is that an obvious solution (collecting the stream) always goes through the whole stream. You cannot take the next x elements, order them, stream them, then repeat without doing it for the whole stream (i.e. all days) at once. For the same reason, calling sorted()
on a stream will go through it completely (especially as the stream does not know the fact that the elements are sorted by days already). For reference, read this comment here: https://stackoverflow.com/a/27595803/7653073.
正如他们所推荐的,这是一个包含在流中的Iterator实现,它在原始流中向前看,占用一天的元素,对它们进行排序,并为您提供整个事物在一个不错的新流中(没有记住所有的日子!)。实现更复杂,因为我们没有固定的块大小,但总是必须找到下一天的第一个元素来知道何时停止。
As they recommend, here is an Iterator implementation wrapped in a stream that kind of looks ahead in the original stream, takes the elements of one day, sorts them, and gives you the whole thing in a nice new stream (without keeping all days in memory!). The implementation is more complicated as we do not have a fixed chunk size, but always have to find the first element of the next next day to know when to stop.
public class DayByDayIterator implements Iterator<MyObject>
{
private Iterator<MyObject> incoming;
private MyObject next;
private Iterator<MyObject> currentDay;
private MyObject firstOfNextDay;
private Set<MyObject> nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
public static Stream<MyObject> streamOf(Stream<MyObject> incoming)
{
Iterable<MyObject> iterable = () -> new DayByDayIterator(incoming);
return StreamSupport.stream(iterable.spliterator(), false);
}
private DayByDayIterator(Stream<MyObject> stream)
{
this.incoming = stream.iterator();
firstOfNextDay = incoming.next();
nextDaysObjects.add(firstOfNextDay);
next();
}
@Override
public boolean hasNext()
{
return next != null;
}
@Override
public MyObject next()
{
if (currentDay == null || !currentDay.hasNext() && incoming.hasNext())
{
nextDay();
}
MyObject result = next;
if (currentDay != null && currentDay.hasNext())
{
this.next = currentDay.next();
}
else
{
this.next = null;
}
return result;
}
private void nextDay()
{
while (incoming.hasNext()
&& firstOfNextDay.getTimestamp().toLocalDate()
.isEqual((firstOfNextDay = incoming.next()).getTimestamp().toLocalDate()))
{
nextDaysObjects.add(firstOfNextDay);
}
currentDay = nextDaysObjects.iterator();
if (incoming.hasNext())
{
nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
nextDaysObjects.add(firstOfNextDay);
}
}
}
像这样使用:
public static void main(String[] args)
{
Stream<MyObject> stream = Stream.of(
new MyObject(LocalDateTime.now().plusHours(1)),
new MyObject(LocalDateTime.now()),
new MyObject(LocalDateTime.now().plusDays(1).plusHours(2)),
new MyObject(LocalDateTime.now().plusDays(1)),
new MyObject(LocalDateTime.now().plusDays(1).plusHours(1)),
new MyObject(LocalDateTime.now().plusDays(2)),
new MyObject(LocalDateTime.now().plusDays(2).plusHours(1)));
DayByDayIterator.streamOf(stream).forEach(System.out::println);
}
------------------- Output -----------------
2017-04-30T17:39:46.353
2017-04-30T18:39:46.333
2017-05-01T17:39:46.353
2017-05-01T18:39:46.353
2017-05-01T19:39:46.353
2017-05-02T17:39:46.353
2017-05-02T18:39:46.353
说明:
currentDay
和 next
是迭代器的基础,而 firstOfNextDay
和 nextDaysObjects
已经看过了第二天的第一个元素。当 currentDay
用尽时, nextDay()
被调用并继续添加传入
的元素到 nextDaysObjects
直到达到下一个下一天,然后将 nextDaysObjects
转换为 currentDay
。
Explanation:
currentDay
and next
are the basis for the iterator, while firstOfNextDay
and nextDaysObjects
already look at the first element of the next day. When currentDay
is exhausted, nextDay()
is called and continues adding incoming
's element to nextDaysObjects
until the next next day is reached, then turns nextDaysObjects
into currentDay
.
一件事:如果传入的流为空或空,它将失败。您可以测试null,但空案例需要在工厂方法中捕获异常。我不想添加它以便于阅读。
One thing: If the incoming stream is null or empty, it will fail. You can test for null, but the empty case requires to catch an Exception in the factory method. I did not want to add this for readability.
我希望这是你需要的,让我知道它是怎么回事。
I hope this is what you need, let me know how it goes.
这篇关于每天在Java 8 Stream API中对实体进行延迟排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!