
1. 问题背景与数据模型
在实际业务场景中,我们经常需要处理包含时间戳的重复数据。例如,一个币种在同一天可能会有多次接收记录,但我们只关心当天最新的一条。我们的目标是,对于给定的币种(例如usd),在每一天中,只保留该币种在当天所有记录中lastreceived时间戳最新的那一条。
首先,定义数据模型Currency类:
import java.time.LocalDateTime;
import java.util.Objects;
class Currency {
private Integer id;
private String name;
private LocalDateTime lastReceived;
// 构造函数
public Currency(Integer id, String name, LocalDateTime lastReceived) {
this.id = id;
this.name = name;
this.lastReceived = lastReceived;
}
// Getter 方法
public Integer getId() {
return id;
}
public String getName() {
return name;
}
public LocalDateTime getLastReceived() {
return lastReceived;
}
// 为了方便打印和比较,重写toString, equals, hashCode
@Override
public String toString() {
return "Currency{" +
"id=" + id +
", name='" + name + '\'' +
", lastReceived=" + lastReceived +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Currency currency = (Currency) o;
return Objects.equals(id, currency.id) &&
Objects.equals(name, currency.name) &&
Objects.equals(lastReceived, currency.lastReceived);
}
@Override
public int hashCode() {
return Objects.hash(id, name, lastReceived);
}
}假设我们有如下数据:
ID NAME LAST_RECEIVED -- ---- ------------- 1 USD 2022-05-18 09:04:01.545899000 2 USD 2022-05-18 08:04:01.545899000 3 USD 2022-05-19 08:04:01.545899000 4 USD 2022-05-20 08:04:01.545899000 5 USD 2022-05-20 11:04:01.545899000 6 BUSD 2022-05-18 08:04:01.545899000
我们期望的结果是:
ID NAME LAST_RECEIVED -- ---- ------------- 1 USD 2022-05-18 09:04:01.545899000 (18号USD最新) 3 USD 2022-05-19 08:04:01.545899000 (19号USD最新) 5 USD 2022-05-20 11:04:01.545899000 (20号USD最新)
2. 使用Java 8 Stream API进行数据处理
Java 8的Stream API提供了强大的数据处理能力,非常适合在内存中对集合进行分组、过滤和转换。
立即学习“Java免费学习笔记(深入)”;
2.1 核心思路:分组与筛选
要实现上述目标,我们需要执行以下步骤:
- 分组:根据币种名称(name)和日期(lastReceived的日期部分)进行分组。
- 筛选:在每个分组内部,找到lastReceived时间戳最大的那条记录。
- 收集:将筛选出的记录收集到一个新的列表中。
2.2 通用解决方案:获取所有币种在各日期的最新记录
如果我们想获取所有币种在各自日期上的最新记录,可以使用Collectors.groupingBy结合Collectors.collectingAndThen和Collectors.maxBy。
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
public class CurrencyDataProcessor {
public static void main(String[] args) {
// 模拟数据
List data = Arrays.asList(
new Currency(1, "USD", LocalDateTime.parse("2022-05-18 09:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))),
new Currency(2, "USD", LocalDateTime.parse("2022-05-18 08:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))),
new Currency(3, "USD", LocalDateTime.parse("2022-05-19 08:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))),
new Currency(4, "USD", LocalDateTime.parse("2022-05-20 08:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))),
new Currency(5, "USD", LocalDateTime.parse("2022-05-20 11:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"))),
new Currency(6, "BUSD", LocalDateTime.parse("2022-05-18 08:04:01.545899", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")))
);
// 获取所有币种在各日期的最新记录
List lastByDateForAllCurrencies = new ArrayList<>(data
.stream() // Stream
.collect(Collectors.groupingBy(
// 分组键:币种名称 + 日期 (LocalDateTime.toLocalDate())
curr -> Arrays.asList(curr.getName(), curr.getLastReceived().toLocalDate()),
Collectors.collectingAndThen(
// 在每个分组内,找到lastReceived最大的Currency对象
Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)),
// Optional -> Currency (Optional::get在确保存在值时是安全的)
Optional::get
)
)) // 结果是一个 Map, Currency>
.values() // 获取Map中所有的值 (即筛选后的Currency对象)
);
System.out.println("所有币种在各日期的最新记录 (未排序):");
lastByDateForAllCurrencies.forEach(System.out::println);
// 如果需要排序,可以对结果列表进行排序
lastByDateForAllCurrencies.sort(
Comparator.comparing(Currency::getName)
.thenComparing(Currency::getLastReceived)
);
System.out.println("\n所有币种在各日期的最新记录 (按币种和时间排序):");
lastByDateForAllCurrencies.forEach(System.out::println);
}
}
代码解释:
- Collectors.groupingBy: 用于将流中的元素按照指定的键进行分组。
- 分组键 curr -> Arrays.asList(curr.getName(), curr.getLastReceived().toLocalDate()):我们创建了一个包含币种名称和日期(通过toLocalDate()从LocalDateTime提取)的列表作为复合键,这样可以确保按币种和日期进行双重分组。
- Collectors.collectingAndThen: 这是一个下游收集器,它首先执行一个收集操作(例如maxBy),然后将结果通过一个函数进行转换。
- Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)): 在每个分组内部,找到lastReceived时间戳最大的Currency对象。maxBy返回一个Optional
。 - Optional::get: 将Optional
转换为Currency对象。注意: 在本例中,由于groupingBy的每个组至少包含一个元素,maxBy的结果必然是非空的Optional,因此使用get()是安全的。但在其他场景下,如果Optional可能为空,应使用orElse或orElseThrow等方法处理。
- Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)): 在每个分组内部,找到lastReceived时间戳最大的Currency对象。maxBy返回一个Optional
- .values():从Map中获取所有值,即我们筛选出的Currency对象。
- .sort():如果需要按特定顺序(例如按币种名称和时间)排列结果,可以对最终列表进行排序。
2.3 特定币种解决方案:获取USD在各日期的最新记录
如果只需要处理特定币种(例如"USD")的数据,可以在分组前添加一个filter操作,并且分组键可以简化。
// 获取特定币种 (USD) 在各日期的最新记录
List lastUSDByDate = new ArrayList<>(data
.stream()
.filter(curr -> "USD".equalsIgnoreCase(curr.getName())) // 过滤出USD币种
.collect(Collectors.groupingBy(
curr -> curr.getLastReceived().toLocalDate(), // 仅按日期分组
Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)),
Optional::get
)
))
.values()
);
// 对结果进行排序
lastUSDByDate.sort(Comparator.comparing(Currency::getLastReceived));
System.out.println("\nUSD币种在各日期的最新记录:");
lastUSDByDate.forEach(System.out::println); 代码解释:
- filter(curr -> "USD".equalsIgnoreCase(curr.getName())):在处理流之前,首先过滤出name为"USD"的Currency对象,提高了效率。
- curr -> curr.getLastReceived().toLocalDate():由于已经过滤了币种,分组键只需包含日期即可。
3. 数据库原生SQL查询方案
对于大量数据,将数据处理逻辑下推到数据库通常是更高效的选择。虽然JPA等ORM框架对窗口函数支持有限,但我们可以使用原生SQL查询来实现。
3.1 使用窗口函数实现
SQL中的窗口函数(如ROW_NUMBER())非常适合这种“每组取N条”的需求。以下是一个针对PostgreSQL的示例:
SELECT id, name, last_received
FROM (
SELECT c.*,
ROW_NUMBER() OVER (
PARTITION BY name, to_char(last_received, 'yyyy-MM-dd')
ORDER BY last_received DESC
) AS rn -- 为每个分区内的记录分配行号,按last_received降序
FROM Currency c
WHERE c.name = :currName -- 可选:过滤特定币种
) tbl
WHERE rn = 1 -- 选取每个分区中行号为1的记录,即最新的一条
ORDER BY last_received; -- 对最终结果进行排序SQL查询解释:
- ROW_NUMBER() OVER (PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC):
- PARTITION BY name, to_char(last_received, 'yyyy-MM-dd'): 将数据按照币种名称和日期(last_received的日期部分)进行分区。to_char函数用于将timestamp转换为日期字符串,以便作为分区键。
- ORDER BY last_received DESC: 在每个分区内部,按照last_received时间戳降序排列。这意味着最新的记录将获得行号1。
- WHERE rn = 1: 筛选出每个分区中行号为1的记录,这正是我们需要的每个币种、每天的最新记录。
- WHERE c.name = :currName: 这是一个可选的过滤条件,用于只查询特定币种的数据。
- ORDER BY last_received: 对最终结果按时间排序。
3.2 在JPA中使用原生查询
如果使用Spring Data JPA,可以通过@Query注解结合nativeQuery = true来执行原生SQL查询。
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import java.util.List; // 假设 CurrencyRepository 继承 JpaRepository public interface CurrencyRepository extends JpaRepository{ @Query(nativeQuery = true, value = """ SELECT id, name, last_received FROM ( SELECT c.*, ROW_NUMBER() OVER ( PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC ) AS rn FROM Currency c WHERE c.name = :currName ) tbl WHERE rn = 1 ORDER BY last_received """) List findLastByDateByCurrencyName(@Param("currName") String currName); // 如果需要获取所有币种的最新记录,可以移除 WHERE c.name = :currName 部分 @Query(nativeQuery = true, value = """ SELECT id, name, last_received FROM ( SELECT c.*, ROW_NUMBER() OVER ( PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC ) AS rn FROM Currency c ) tbl WHERE rn = 1 ORDER BY name, last_received """) List findAllLastByDate(); }
4. 注意事项与总结
-
性能考量:
- Java Stream API:适用于数据量较小,或数据已加载到内存中的场景。其优势在于代码简洁、易读,且充分利用了Java 8的函数式编程特性。但如果数据量非常大(例如数百万条),在内存中处理可能会消耗大量内存和CPU资源。
- SQL原生查询:对于大规模数据集,将逻辑下推到数据库是更优的选择。数据库针对这类分组和排序操作进行了高度优化,能够更高效地利用索引和内部算法。
- 日期精度:在Java Stream API中,我们使用LocalDateTime.toLocalDate()来获取日期部分。在SQL中,我们使用to_char(last_received, 'yyyy-MM-dd')来提取日期部分。确保两者在处理日期时的一致性。
- Optional::get的安全性:在Collectors.maxBy的场景中,groupingBy确保了每个组至少有一个元素,所以maxBy返回的Optional不会为空,get()是安全的。但在其他可能返回空Optional的场景,务必使用orElse、orElseThrow或ifPresent等方法进行安全处理。
- 灵活性:Java Stream API提供了极大的灵活性,可以轻松调整分组键、筛选条件和聚合逻辑。SQL窗口函数同样强大,但需要对SQL语法有较深的理解。
综上所述,根据具体的数据量、性能要求和现有技术栈,可以选择最适合的方案。对于中小型数据集,Java 8 Stream API提供了一种优雅且高效的内存处理方式;而对于大型数据集或需要利用数据库强大能力的场景,原生SQL查询(尤其是窗口函数)则是更专业的选择。










