文章目录
- 1.MassMailTask.java 延迟任务bean对象
- 2.MassMailTaskService.java
- 3.RedisUtil.java
1.MassMailTask.java 延迟任务bean对象
package com.sunxiansheng.user.delayQueue;
import lombok.Data;
import java.util.Date;
@Data
public class MassMailTask {
private Long taskId;
private Date startTime;
}
2.MassMailTaskService.java
package com.sunxiansheng.user.delayQueue;
import com.alibaba.fastjson.JSON;
import com.sunxiansheng.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
public class MassMailTaskService {
public static final String MASS_TASK_KEY = "massTaskMail";
@Resource
private RedisUtil redisUtil;
public void pushMassMailTaskQueue(MassMailTask massMailTask) {
Date startTime = massMailTask.getStartTime();
if (startTime == null) {
return;
}
if (startTime.compareTo(new Date()) <= 0) {
return;
}
log.info("定时任务加入队列,massTask:{}", JSON.toJSONString(massMailTask));
redisUtil.zAdd(MASS_TASK_KEY, massMailTask.getTaskId(), startTime.getTime());
}
public Set<Long> poolMassTaskQueue() {
Set<Object> set = redisUtil.zRangeByScore(MASS_TASK_KEY, 0, System.currentTimeMillis());
if (CollectionUtils.isEmpty(set)) {
return Collections.emptySet();
}
redisUtil.zRemoveBySet(MASS_TASK_KEY, set);
return set.stream().map(n -> {
String string = n.toString();
return Long.parseLong(string);
}).collect(Collectors.toSet());
}
}
3.RedisUtil.java
package com.sunxiansheng.redis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);
@Resource
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_KEY_SEPARATOR = ".";
public String buildKey(String... strObjs) {
return String.join(CACHE_KEY_SEPARATOR, strObjs);
}
public boolean exists(String key) {
return execute(() -> redisTemplate.hasKey(key));
}
public boolean delete(String key) {
return execute(() -> redisTemplate.delete(key));
}
public void set(String key, Object value) {
execute(() -> {
redisTemplate.opsForValue().set(key, value);
return null;
});
}
public void set(String key, Object value, long timeout, TimeUnit unit) {
execute(() -> {
redisTemplate.opsForValue().set(key, value, timeout, unit);
return null;
});
}
public boolean setIfAbsent(String key, Object value, long timeout, TimeUnit unit) {
return execute(() -> redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit));
}
public <T> T get(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForValue().get(key), clazz));
}
public void increment(String key, long delta) {
execute(() -> {
redisTemplate.opsForValue().increment(key, delta);
return null;
});
}
public void hPut(String key, String hashKey, Object value) {
execute(() -> {
redisTemplate.opsForHash().put(key, hashKey, value);
return null;
});
}
public <T> T hGet(String key, String hashKey, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForHash().get(key, hashKey), clazz));
}
public Map<Object, Object> hGetAll(String key) {
return execute(() -> redisTemplate.opsForHash().entries(key));
}
public void hDelete(String key, Object... hashKey) {
execute(() -> {
redisTemplate.opsForHash().delete(key, hashKey);
return null;
});
}
public Map<Object, Object> hGetAndDelete(String key) {
Map<Object, Object> map = new HashMap<>();
try (Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(key, ScanOptions.NONE)) {
while (cursor.hasNext()) {
Map.Entry<Object, Object> entry = cursor.next();
Object hashKey = entry.getKey();
Object hashValue = entry.getValue();
map.put(hashKey, hashValue);
redisTemplate.opsForHash().delete(key, hashKey);
}
} catch (Exception e) {
logger.error("Redis hGetAndDelete error: key={}", key, e);
}
return map;
}
public void lPush(String key, Object value) {
execute(() -> {
redisTemplate.opsForList().leftPush(key, value);
return null;
});
}
public void rPush(String key, Object value) {
execute(() -> {
redisTemplate.opsForList().rightPush(key, value);
return null;
});
}
public <T> T lPop(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForList().leftPop(key), clazz));
}
public <T> T rPop(String key, Class<T> clazz) {
return execute(() -> castValue(redisTemplate.opsForList().rightPop(key), clazz));
}
public List<Object> lRange(String key, long start, long end) {
return execute(() -> redisTemplate.opsForList().range(key, start, end));
}
public void sAdd(String key, Object... values) {
execute(() -> {
redisTemplate.opsForSet().add(key, values);
return null;
});
}
public Set<Object> sMembers(String key) {
return execute(() -> redisTemplate.opsForSet().members(key));
}
public boolean sIsMember(String key, Object value) {
return execute(() -> redisTemplate.opsForSet().isMember(key, value));
}
public Object sPop(String key) {
return execute(() -> redisTemplate.opsForSet().pop(key));
}
public Long sCard(String key) {
return execute(() -> redisTemplate.opsForSet().size(key));
}
public boolean zAdd(String key, Object value, double score) {
return execute(() -> redisTemplate.opsForZSet().add(key, value, score));
}
public Long zCard(String key) {
return execute(() -> redisTemplate.opsForZSet().size(key));
}
public Set<Object> zRange(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().range(key, start, end));
}
public Long zRemove(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().remove(key, value));
}
public Long zRemoveByList(String key, List<Object> values) {
return execute(() -> {
Long removedCount = 0L;
for (Object value : values) {
removedCount += redisTemplate.opsForZSet().remove(key, value);
}
return removedCount;
});
}
public Long zRemoveBySet(String key, Set<Object> values) {
return execute(() -> {
Long removedCount = 0L;
for (Object value : values) {
removedCount += redisTemplate.opsForZSet().remove(key, value);
}
return removedCount;
});
}
public Double zScore(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().score(key, value));
}
public Set<Object> zRangeByScore(String key, double start, double end) {
return execute(() -> redisTemplate.opsForZSet().rangeByScore(key, start, end));
}
public Double zIncrementScore(String key, Object value, double score) {
return execute(() -> redisTemplate.opsForZSet().incrementScore(key, value, score));
}
public Long zRank(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().rank(key, value));
}
public Set<ZSetOperations.TypedTuple<Object>> zRangeWithScores(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().rangeWithScores(key, start, end));
}
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScoreWithScores(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max));
}
public Long zRevRank(String key, Object value) {
return execute(() -> redisTemplate.opsForZSet().reverseRank(key, value));
}
public Long zCount(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().count(key, min, max));
}
public Long zRemoveByScore(String key, double min, double max) {
return execute(() -> redisTemplate.opsForZSet().removeRangeByScore(key, min, max));
}
public Long zRemoveByRank(String key, long start, long end) {
return execute(() -> redisTemplate.opsForZSet().removeRange(key, start, end));
}
private <T> T execute(RedisOperation<T> operation) {
try {
return operation.execute();
} catch (Exception e) {
logger.error("Redis operation error", e);
return null;
}
}
@FunctionalInterface
private interface RedisOperation<T> {
T execute();
}
public <T> T castValue(Object value, Class<T> clazz) {
if (value == null) {
return null;
}
if (clazz == Long.class && value instanceof Integer) {
return clazz.cast(((Integer) value).longValue());
}
return clazz.cast(value);
}
}