java-Redis集合

引用包:jedis-3.0.1.jar、commons-pool2-2.6.0.jar

一、从 Redis 集合中实时获取数据:

连接 Redis

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public static Lock lock = new ReentrantLock();

/**
* 连接 Redis
*
@param conferenceId
*
@return
*/
public String startRedis(String topicId) {
textMessage
= "";
String result
= "";
try {
JedisPoolConfig config
= new JedisPoolConfig();
config.setTimeBetweenEvictionRunsMillis(
30000);
config.setMaxWaitMillis(
10 * 1000);
config.setMaxIdle(
1000);
config.setTestOnBorrow(
true);
JedisPool jedisPool
= new JedisPool(config, redisIpYJ, Integer.parseInt(redisPortYJ),10000);// 连接 redis 服务端
result = "连接 Redis 成功";
lock.lock();
try {
Thread thread
= new Thread(new Runnable() {
@Override
public void run() {
getRecordTextNew_Redis(jedisPool,topicId);
}
});
thread.start();
}
finally {
lock.unlock();
}
}
catch (Exception e) {
result
= "连接 Redis 失败:" + e.getMessage();
}
return result;
}

 

 

实时获取数据

public static Boolean isSelectRedis = false;//是否继续查询 Redis

/**
* 从 Redis 实时获取语音记录文本(党组会)
*
@param topicId
*/
public void getRecordTextNew_Redis(JedisPool jedisPool,String topicId) {
Jedis jedis
= null;
while (isSelectRedis) {
try {
jedis
= jedisPool.getResource(); //取出一个连接
Set<String> results = jedis.zrange("asr:text:"+topicId,0,-1);
for (String result: results) {
//TODO 消费 result
if (StringUtils.isNotEmpty(result)) {
JSONObject resultMsg
= JSONObject.parseObject(result);
String text
= resultMsg.getString("result");
System.out.println(
"消息 text:"+text);
String pgs
= "1";
String micName
= resultMsg.getString("roleName");
String micId
= resultMsg.getString("role");
String uId
= resultMsg.getString("uid");//段落 ID
if (StringUtils.isNotEmpty(text)) {
String dataText
= "<b>" + micName + ":</b>" + text;
String dataText2
= "<div id=""+ uId +""><b>" + micName + ":</b>" + text+"</div>";
textMap.put(uId, dataText2);
System.out.println(
"消息 dataText:"+dataText);
JSONObject textObj = new JSONObject();
textObj.put(
"dataText", dataText);
textObj.put(
"dataPgs", pgs);
textObj.put(
"dataUId", uId);
try {
Thread.sleep(
400);
}
catch (InterruptedException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
ConfWebSocketService.sendMessage(textObj.toJSONString(),
"2");//向页面发送消息
}
}
}
String[] strResults
= (String[])(results.toArray(new String[results.size()]));
if (strResults.length > 0) {
//TODO 移除消费掉的数据
jedis.zrem("asr:text:"+topicId, strResults);
}
Thread.sleep(
300);
}
catch (Exception e) {
if (jedis != null) {
jedis.close();
}
e.printStackTrace();
}
finally {
if (jedis != null) {
jedis.close();
}
}
}
}

二、通过 Redis 订阅消息:

package net.nblh.utils.common;

import java.util.Set;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**

  • 建立订阅者,订阅者去订阅频道(mychannel)
  • @author lijd

*/
public class GetSpeechRecognition_YJ_Sub extends Thread{
private final JedisPool jedisPool;
private final GetSpeechRecognition_YJ_Msg msgListener = new GetSpeechRecognition_YJ_Msg();
private final String channel = "db0";//"mychannel";

<span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> GetSpeechRecognition_YJ_Sub(JedisPool jedisPool) {
    </span><span style="color: rgba(0, 0, 255, 1)">super</span>("GetSpeechRecognition_YJ_Sub"<span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
    Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
        jedis </span>= jedisPool.getResource();   <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取出一个连接</span>
        Set&lt;String&gt; result = jedis.zrange("asr:text:1112",0,-1<span style="color: rgba(0, 0, 0, 1)">);
        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">jedis.subscribe(msgListener, channel); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">通过subscribe的api去订阅,参数是订阅者和频道名
        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码
        </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行</span>
        System.out.println("继续执行后续代码。。。"<span style="color: rgba(0, 0, 0, 1)">);
        
    } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
            jedis.close();
        }
        e.printStackTrace();
    }</span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
            jedis.close();
        }
    }
}

}

package net.nblh.utils.common;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**

  • // 建立发布者,通过频道(mychannel)发布消息
  • @author lijd

*/
public class GetSpeechRecognition_YJ_Pub extends Thread{
private final JedisPool jedisPool;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> GetSpeechRecognition_YJ_Pub(JedisPool jedisPool) {
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.jedisPool =<span style="color: rgba(0, 0, 0, 1)"> jedisPool;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
    </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">) {
        Jedis jedis </span>= <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
        </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
            Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);</span><span style="color: rgba(0, 0, 0, 1)">
            jedis </span>= jedisPool.getResource();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">连接池中取出一个连接</span>
            String line = "fabuxiaoxi:"<span style="color: rgba(0, 0, 0, 1)">;</span>
            <span style="color: rgba(0, 0, 255, 1)">if</span> (!"quit"<span style="color: rgba(0, 0, 0, 1)">.equals(line)) {
                jedis.publish(</span>"mychannel", line);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">从通过mychannel 频道发布消息</span>
                System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel"<span style="color: rgba(0, 0, 0, 1)">, line));
            }</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">;
            }
            </span><span style="color: rgba(0, 0, 255, 1)">if</span> (jedis != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
                jedis.close();
            }
        }</span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
            e.printStackTrace();
        }            
    }
}

}

package net.nblh.utils.common;

import redis.clients.jedis.JedisPubSub;

/**

  • 建立消息监听类,并重写了 JedisPubSub 的一些相关方法
  • @author lijd

*/
public class GetSpeechRecognition_YJ_Msg extends JedisPubSub{
public GetSpeechRecognition_YJ_Msg(){}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onMessage(String channel, String message) {       
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">收到消息会调用</span>
    System.out.println(String.format("收到消息成功! channel: %s, message: %s"<span style="color: rgba(0, 0, 0, 1)">, channel, message));
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">this.unsubscribe();</span>

}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onSubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> subscribedChannels) {    
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">订阅频道会调用</span>
    System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d"<span style="color: rgba(0, 0, 0, 1)">,
    channel, subscribedChannels));
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> onUnsubscribe(String channel, <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> subscribedChannels) {   
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">取消订阅会调用</span>
    System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d"<span style="color: rgba(0, 0, 0, 1)">,
            channel, subscribedChannels));
}

}