您现在的位置: 首页 > 网站导航收录 > 百科知识百科知识
mq(分布式必备--MQ)
消息,队列,消费者mq(分布式必备--MQ)
发布时间:2019-02-08加入收藏来源:互联网点击:
3.如何保证从消息队列里拿到的数据按顺序执行?
通过算法,将需要保持先后顺序的消息放到同一个消息队列中,然后只用一个消费者去消费该队列。
rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可4.如何解决消息队列的延时以及过期失效问题?有几百万消息持续积压几小时,怎么解决?
这个问题是生产环境出现事故后的,考察你如何快速的解决问题,,消息队列的延迟和过期失效是消息队列的自我保护机制,目的是为了防止本身被挤爆,当然是可以关闭保护,比如当某个消息消费失败5次后,就把这个消息丢弃等,尽量不要关掉保护机制,那么问题来了,那些被丢弃的消息难道就不要了吗?其实并不是,我们可以针对该业务,查询出来将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把丢的数据给他补回来。
5.数据是通过push还是pull方式给到消费端,各自有什么弊端?
Push模型实时性好,但是因为状态维护等问题,难以应用到消息中间件的实践中,因为在Broker端需要维护Consumer的状态,不好适用于Broker去支持大量的Consumer的场景Consumer的消费速度是不一致的,Broker进行推送难以处理不同的Consumer的状况Broker难以应对Consumer无法消费消息的情况,因为不知道Consumer的宕机是短暂的还是永久的)另外推送消息(量可能会很大)也会加重Consumer的负载或者压垮Consumer。如果对应只有1个Consumer,用push比pull好。Pull模式实现起来会相对简单一些,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。3.如何使用MQ(以ActiveQM为例)
附上官网:http://activemq.apache.org/
附上启动服务访问地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin
发布订阅模式
生产者-发布
public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM=10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("FirstTopic1"); messageProducer=session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i<JMSProducer.SENDNUM;i ){ TextMessage message=session.createTextMessage("ActiveMQ 发送的消息" i); System.out.println("发送消息:" "ActiveMQ 发布的消息" i); messageProducer.send(message); } }}
消费者-订阅
/** * 消息监听-订阅者一 * @author Administrator * */public class Listener implements MessageListener{ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("订阅者一收到的消息:" ((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
public class JMSConsumer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列 destination=session.createTopic("FirstTopic1"); messageConsumer=session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
我认为一个优秀的分布式消息队列,应该具备以下的能力:高吞吐、低时延(因场景而异),传输透明,伸缩性强,有冗灾能力,一致性顺序投递,同步 异步的发送方式,完善的运维和监控工具和开源
本文到此结束,希望对大家有所帮助呢。
上一篇:怎样看建筑图纸(老师傅通过6个妙招,分分钟教你看懂建筑图纸)
下一篇:返回列表
相关链接 |
||
网友回复(共有 0 条回复) |