
在分布式应用中,统计当前登录用户数量是一个常见需求。通常,开发者会选择将用户计数存储在共享缓存中,例如infinispan。当用户登录时,从缓存中获取当前值,加1后再存回缓存。然而,在多用户同时登录的高并发场景下,这种简单的“读取-修改-写入”操作序列极易引发竞态条件(race condition),导致计数不准确。例如,两个用户几乎同时登录,都读取到旧的计数n,然后各自加1并写回n+1,最终结果本应是n+2,却只变成了n+1,造成数据丢失。为了解决这一问题,infinispan提供了多种强大的并发控制机制。
Infinispan提供了一个专门的分布式计数器(Counters)API,旨在解决这种原子性增减操作的需求。它支持强一致性计数器(StrongCounter)和弱一致性计数器(WeakCounter)。对于用户登录统计这种需要精确计数的场景,通常推荐使用强一致性计数器。
特点:
示例代码:
大小仅1兆左右 ,足够轻便的商城系统; 易部署,上传空间即可用,安全,稳定; 容易操作,登陆后台就可设置装饰网站; 并且使用异步技术处理网站数据,表现更具美感。 前台呈现页面,兼容主流浏览器,DIV+CSS页面设计; 如果您有一定的网页设计基础,还可以进行简易的样式修改,二次开发, 发布新样式,调整网站结构,只需修改css目录中的css.css文件即可。 商城网站完全独立,网站源码随时可供您下载
3
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterType;
public class UserCounterService {
private final RemoteCacheManager cacheManager;
private final StrongCounter loggedInUsersCounter;
public UserCounterService(RemoteCache RemoteCacheManager cacheManager) {
this.cacheManager = cacheManager;
CounterManager counterManager = cacheManager.get
CounterManager();
// 获取或创建强一致性计数器
// 可以配置初始值、上限、下限等
this.loggedInUsersCounter = counterManager.define
StrongCounter("loggedInUsers",
CounterConfiguration.builder(CounterType.BOUNDED_STRONG)
.initialValue(0)
.lowerBound(0)
.build());
// 或者直接获取已定义的计数器
// this.loggedInUsersCounter = counterManager.getStrongCounter("loggedInUsers");
}
/**
* 用户登录时调用,增加计数
*/
public void userLoggedIn() {
loggedInUsersCounter.increment();
System.out.println("用户登录,当前用户数: " + loggedInUsersCounter.getValue());
}
/**
* 用户登出时调用,减少计数
*/
public void userLoggedOut() {
loggedInUsersCounter.decrement();
System.out.println("用户登出,当前用户数: " + loggedInUsersCounter.getValue());
}
/**
* 获取当前登录用户数
* @return 当前用户数
*/
public long getCurrentLoggedInUsers() {
return loggedInUsersCounter.getValue();
}
public static void main(String[] args) {
// 假设已经配置并启动了RemoteCacheManager
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
UserCounterService service = new UserCounterService(remoteCacheManager);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(service::userLoggedIn).start();
}
try {
Thread.sleep(1000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
}Infinispan支持分布式事务,可以将一系列操作封装在一个原子单元中。这意味着,在一个事务中的所有操作要么全部成功,要么全部失败。通过将“读取-修改-写入”逻辑放入一个事务中,可以确保其原子性。
特点:
示例代码:
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.CacheContainer;
import org.infinispan.commons.api.CacheContainer.Builder;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
public class UserLoginTransactionalService {
private final RemoteCache<String, Long> userCountCache;
private final TransactionManager tm; // JBoss Transaction Manager or similar
private final UserTransaction ut;
public UserLoginTransactionalService(RemoteCacheManager cacheManager) throws Exception {
// 确保缓存配置为事务模式
// 在服务器端或客户端配置中设置 transactionMode(TransactionMode.TRANSACTIONAL)
this.userCountCache = cacheManager.getCache("userCountCache");
// 获取JTA事务管理器和用户事务
// 在实际应用中,通常通过JNDI查找或DI框架注入
this.tm = com.arjuna.ats.jta.TransactionManager.transactionManager(); // 示例:使用Narayana
this.ut = com.arjuna.ats.jta.UserTransaction.userTransaction();
}
public void userLoggedIn() throws Exception {
ut.begin(); // 开启事务
try {
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L;
}
userCountCache.put("loggedInUsers", currentCount + 1);
ut.commit(); // 提交事务
System.out.println("用户登录(事务),当前用户数: " + userCountCache.get("loggedInUsers"));
} catch (Exception e) {
ut.rollback(); // 回滚事务
System.err.println("用户登录事务失败: " + e.getMessage());
throw e;
}
}
public long getCurrentLoggedInUsers() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) throws Exception {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
// 确保服务器端缓存 'userCountCache' 配置为 TRANSACTIONAL 模式
UserLoginTransactionalService service = new UserLoginTransactionalService(remoteCacheManager);
// 初始化计数
service.userCountCache.put("loggedInUsers", 0L);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
service.userLoggedIn();
} catch (Exception e) {
// 处理异常
}
}).start();
}
try {
Thread.sleep(2000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
}注意事项:
Infinispan的Hot Rod客户端API提供了一些条件操作,例如replace(K key, V oldValue, V newValue)和putIfAbsent(K key, V value)。这些操作只有在满足特定条件时才会执行,可以用于实现乐观锁机制。
特点:
示例代码:
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
public class UserLoginVersionedService {
private final RemoteCache<String, Long> userCountCache;
public UserLoginVersionedService(RemoteCacheManager cacheManager) {
this.userCountCache = cacheManager.getCache("userCountCache");
}
public void userLoggedIn() {
int maxRetries = 5; // 最大重试次数
for (int i = 0; i < maxRetries; i++) {
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L;
}
Long newCount = currentCount + 1;
// 尝试原子性替换:只有当键'loggedInUsers'的值仍然是currentCount时,才将其更新为newCount
boolean success = userCountCache.replace("loggedInUsers", currentCount, newCount);
if (success) {
System.out.println("用户登录(版本化),当前用户数: " + newCount);
return; // 更新成功,退出循环
} else {
// 更新失败,说明其他线程在此期间修改了值,需要重试
System.out.println("更新冲突,重试... (当前尝试: " + (i + 1) + ")");
try {
Thread.sleep(100); // 稍作等待,避免紧密循环
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
System.err.println("用户登录失败,达到最大重试次数。");
}
public long getCurrentLoggedInUsers() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
UserLoginVersionedService service = new UserLoginVersionedService(remoteCacheManager);
// 初始化计数
service.userCountCache.put("loggedInUsers", 0L);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(service::userLoggedIn).start();
}
try {
Thread.sleep(2000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
}在分布式系统中处理并发计数问题是构建高可用和数据一致性应用的关键。Infinispan作为一款强大的分布式缓存,提供了多种机制来应对这些挑战。对于用户登录计数这类常见场景,Infinispan的分布式计数器提供了最直接和高效的解决方案。当业务逻辑更复杂时,事务和版本化操作则提供了更灵活的并发控制手段。理解并选择最适合你应用场景的Infinispan并发控制策略,是确保系统健壮性和数据准确性的重要一步。
以上就是Infinispan中实现并发安全计数器:解决分布式应用中的用户登录统计挑战的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号