java-Redis集合
引用包:jedis-3.0.1.jar、commons-pool2-2.6.0.jar
一、从 Redis 集合中实时获取数据:
连接 Redis
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;public static Lock lock = new ReentrantLock();
/**
* 连接 Redis
* @param conferenceId
* @return
*/
public String startRedis(String topicId) {
textMessage = "";
String result = "";
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setTimeBetweenEvictionRunsMillis(30000);
config.setMaxWaitMillis(10 * 1000);
config.setMaxIdle(1000);
config.setTestOnBorrow(true);
JedisPool jedisPool = new JedisPool(config, redisIpYJ, Integer.parseInt(redisPortYJ),10000);// 连接 redis 服务端
result = "连接 Redis 成功";
lock.lock();
try {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
getRecordTextNew_Redis(jedisPool,topicId);
}
});
thread.start();
} finally {
lock.unlock();
}
} catch (Exception e) {
result = "连接 Redis 失败:" + e.getMessage();
}
return result;
}
实时获取数据
public static Boolean isSelectRedis = false;//是否继续查询 Redis/**
* 从 Redis 实时获取语音记录文本(党组会)
* @param topicId
*/
public void getRecordTextNew_Redis(JedisPool jedisPool,String topicId) {
Jedis jedis = null;
while (isSelectRedis) {
try {
jedis = jedisPool.getResource(); //取出一个连接
Set<String> results = jedis.zrange("asr:text:"+topicId,0,-1);
for (String result: results) {
//TODO 消费 result
if (StringUtils.isNotEmpty(result)) {
JSONObject resultMsg = JSONObject.parseObject(result);
String text = resultMsg.getString("result");
System.out.println("消息 text:"+text);
String pgs = "1";
String micName = resultMsg.getString("roleName");
String micId = resultMsg.getString("role");
String uId = resultMsg.getString("uid");//段落 ID
if (StringUtils.isNotEmpty(text)) {
String dataText = "<b>" + micName + ":</b>" + text;
String dataText2 = "<div id=""+ uId +""><b>" + micName + ":</b>" + text+"</div>";
textMap.put(uId, dataText2);
System.out.println("消息 dataText:"+dataText);
JSONObject textObj = new JSONObject();
textObj.put("dataText", dataText);
textObj.put("dataPgs", pgs);
textObj.put("dataUId", uId);
try {
Thread.sleep(400);
} catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
ConfWebSocketService.sendMessage(textObj.toJSONString(), "2");//向页面发送消息
}
}
}
String[] strResults = (String[])(results.toArray(new String[results.size()]));
if (strResults.length > 0) {
//TODO 移除消费掉的数据
jedis.zrem("asr:text:"+topicId, strResults);
}
Thread.sleep(300);
} catch (Exception e) {
if (jedis != null) {
jedis.close();
}
e.printStackTrace();
}finally {
if (jedis != null) {
jedis.close();
}
}
}
}
二、通过 Redis 订阅消息:
package net.nblh.utils.common;import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/**
- 建立订阅者,订阅者去订阅频道(mychannel)
- @author lijd
*/
public class GetSpeechRecognition_YJ_Sub extends Thread{
private final JedisPool jedisPool;
private final GetSpeechRecognition_YJ_Msg msgListener = new GetSpeechRecognition_YJ_Msg();
private final String channel = "db0";//"mychannel";<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> GetSpeechRecognition_YJ_Sub(JedisPool jedisPool) { </span><span style="color: rgba(0, 0, 255, 1)">super</span>("GetSpeechRecognition_YJ_Sub"<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() { Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { jedis </span>= jedisPool.getResource(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取出一个连接</span> Set<String> result = jedis.zrange("asr:text:1112",0,-1<span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">jedis.subscribe(msgListener, channel); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">通过subscribe的api去订阅,参数是订阅者和频道名 </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码 </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行</span> System.out.println("继续执行后续代码。。。"<span style="color: rgba(0, 0, 0, 1)">); } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { jedis.close(); } e.printStackTrace(); }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { jedis.close(); } } }
}
package net.nblh.utils.common;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/**
- // 建立发布者,通过频道(mychannel)发布消息
- @author lijd
*/
public class GetSpeechRecognition_YJ_Pub extends Thread{
private final JedisPool jedisPool;</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> GetSpeechRecognition_YJ_Pub(JedisPool jedisPool) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() { </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">) { Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);</span><span style="color: rgba(0, 0, 0, 1)"> jedis </span>= jedisPool.getResource();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">连接池中取出一个连接</span> String line = "fabuxiaoxi:"<span style="color: rgba(0, 0, 0, 1)">;</span> <span style="color: rgba(0, 0, 255, 1)">if</span> (!"quit"<span style="color: rgba(0, 0, 0, 1)">.equals(line)) { jedis.publish(</span>"mychannel", line);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">从通过mychannel 频道发布消息</span> System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel"<span style="color: rgba(0, 0, 0, 1)">, line)); }</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> { </span><span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">; } </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { jedis.close(); } }</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) { e.printStackTrace(); } } }
}
package net.nblh.utils.common;import redis.clients.jedis.JedisPubSub;
/**
- 建立消息监听类,并重写了 JedisPubSub 的一些相关方法
- @author lijd
*/
public class GetSpeechRecognition_YJ_Msg extends JedisPubSub{
public GetSpeechRecognition_YJ_Msg(){}@Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onMessage(String channel, String message) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">收到消息会调用</span> System.out.println(String.format("收到消息成功! channel: %s, message: %s"<span style="color: rgba(0, 0, 0, 1)">, channel, message)); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">this.unsubscribe();</span>
}
@Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onSubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> subscribedChannels) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">订阅频道会调用</span> System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d"<span style="color: rgba(0, 0, 0, 1)">, channel, subscribedChannels)); } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onUnsubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> subscribedChannels) { </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取消订阅会调用</span> System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d"<span style="color: rgba(0, 0, 0, 1)">, channel, subscribedChannels)); }
}