17. Mensajería en Spring
📨 17.1 Introducción a la Mensajería
Section titled “📨 17.1 Introducción a la Mensajería”¿Por qué usar mensajería?
Section titled “¿Por qué usar mensajería?”| Ventaja | Descripción |
|---|---|
| Desacoplamiento | Servicios independientes |
| Escalabilidad | Procesar mensajes en paralelo |
| Resiliencia | Mensajes persistentes |
| Asincronía | No bloquea al emisor |
Comparativa de brokers
Section titled “Comparativa de brokers”| Característica | RabbitMQ | Kafka |
|---|---|---|
| Modelo | Message Queue | Event Log |
| Persistencia | Opcional | Por defecto |
| Throughput | Medio-Alto | Muy alto |
| Orden | Por cola | Por partición |
| Caso de uso | Tareas, RPC | Streaming, logs |
🐰 17.2 RabbitMQ
Section titled “🐰 17.2 RabbitMQ”Dependencia
Section titled “Dependencia”<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>Configuración
Section titled “Configuración”# application.propertiesspring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/Configuración de colas y exchanges
Section titled “Configuración de colas y exchanges”@Configurationpublic class RabbitConfig {
// Cola directa @Bean public Queue pedidosQueue() { return QueueBuilder.durable("pedidos.queue") .withArgument("x-dead-letter-exchange", "dlx.exchange") .withArgument("x-dead-letter-routing-key", "dlx.pedidos") .build(); }
// Exchange directo @Bean public DirectExchange pedidosExchange() { return new DirectExchange("pedidos.exchange"); }
// Binding @Bean public Binding pedidosBinding(Queue pedidosQueue, DirectExchange pedidosExchange) { return BindingBuilder .bind(pedidosQueue) .to(pedidosExchange) .with("pedido.creado"); }
// Exchange de tipo Topic (patrones) @Bean public TopicExchange notificacionesExchange() { return new TopicExchange("notificaciones.exchange"); }
@Bean public Queue emailQueue() { return new Queue("notificaciones.email"); }
@Bean public Queue smsQueue() { return new Queue("notificaciones.sms"); }
@Bean public Binding emailBinding(Queue emailQueue, TopicExchange notificacionesExchange) { return BindingBuilder .bind(emailQueue) .to(notificacionesExchange) .with("notificacion.*.email"); // Patrón }
// Configurar JSON @Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(messageConverter()); return template; }}Productor
Section titled “Productor”@Service@Slf4jpublic class PedidoService {
private final RabbitTemplate rabbitTemplate; private final PedidoRepository repository;
public Pedido crear(PedidoDTO dto) { Pedido pedido = repository.save(new Pedido(dto));
// Publicar evento PedidoCreadoEvent evento = PedidoCreadoEvent.builder() .pedidoId(pedido.getId()) .usuarioId(pedido.getUsuarioId()) .total(pedido.getTotal()) .timestamp(LocalDateTime.now()) .build();
rabbitTemplate.convertAndSend( "pedidos.exchange", // Exchange "pedido.creado", // Routing key evento // Mensaje );
log.info("Evento publicado: {}", evento); return pedido; }}Consumidor
Section titled “Consumidor”@Service@Slf4jpublic class NotificacionService {
@RabbitListener(queues = "pedidos.queue") public void procesarPedido(PedidoCreadoEvent evento) { log.info("Procesando pedido: {}", evento.getPedidoId());
// Enviar email de confirmación enviarEmailConfirmacion(evento); }
// Con acknowledgment manual @RabbitListener(queues = "pedidos.queue") public void procesarConAck(PedidoCreadoEvent evento, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { try { procesarEvento(evento); channel.basicAck(tag, false); // Confirmar } catch (Exception e) { channel.basicNack(tag, false, true); // Reencolar } }
// Múltiples colas @RabbitListener(queues = {"cola1", "cola2"}) public void procesarMultiple(String mensaje) { // ... }}Dead Letter Queue
Section titled “Dead Letter Queue”@Configurationpublic class DLQConfig {
@Bean public Queue dlqQueue() { return new Queue("dlq.pedidos"); }
@Bean public DirectExchange dlxExchange() { return new DirectExchange("dlx.exchange"); }
@Bean public Binding dlqBinding() { return BindingBuilder .bind(dlqQueue()) .to(dlxExchange()) .with("dlx.pedidos"); }}
// Consumidor de DLQ@RabbitListener(queues = "dlq.pedidos")public void procesarMensajesFallidos(Message message) { log.error("Mensaje fallido: {}", new String(message.getBody())); // Guardar en BD, notificar, etc.}🦅 17.3 Apache Kafka
Section titled “🦅 17.3 Apache Kafka”Dependencia
Section titled “Dependencia”<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>Configuración
Section titled “Configuración”# application.propertiesspring.kafka.bootstrap-servers=localhost:9092
# Productorspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumidorspring.kafka.consumer.group-id=mi-grupospring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring.kafka.consumer.properties.spring.json.trusted.packages=*Configuración Java
Section titled “Configuración Java”@Configurationpublic class KafkaConfig {
@Bean public NewTopic pedidosTopic() { return TopicBuilder.name("pedidos") .partitions(3) .replicas(1) .build(); }
@Bean public NewTopic notificacionesTopic() { return TopicBuilder.name("notificaciones") .partitions(5) .replicas(1) .config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 días .build(); }}Productor Kafka
Section titled “Productor Kafka”@Service@Slf4jpublic class PedidoKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void enviarPedido(PedidoCreadoEvent evento) { // Envío simple kafkaTemplate.send("pedidos", evento);
// Con key (para ordenamiento por partición) kafkaTemplate.send("pedidos", evento.getPedidoId().toString(), evento);
// Con callback CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("pedidos", evento);
future.whenComplete((result, ex) -> { if (ex == null) { log.info("Mensaje enviado: offset={}", result.getRecordMetadata().offset()); } else { log.error("Error enviando mensaje", ex); } }); }
// Envío a partición específica public void enviarAParticion(String topic, int particion, String key, Object mensaje) { kafkaTemplate.send(topic, particion, key, mensaje); }}Consumidor Kafka
Section titled “Consumidor Kafka”@Service@Slf4jpublic class PedidoKafkaConsumer {
@KafkaListener(topics = "pedidos", groupId = "pedidos-group") public void consumir(PedidoCreadoEvent evento) { log.info("Recibido: {}", evento); procesarPedido(evento); }
// Con metadatos @KafkaListener(topics = "pedidos", groupId = "pedidos-group") public void consumirConMetadata( PedidoCreadoEvent evento, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, @Header(KafkaHeaders.OFFSET) long offset, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
log.info("Partición: {}, Offset: {}, Timestamp: {}", partition, offset, timestamp); procesarPedido(evento); }
// Batch processing @KafkaListener(topics = "pedidos", groupId = "batch-group", containerFactory = "batchFactory") public void consumirBatch(List<PedidoCreadoEvent> eventos) { log.info("Procesando {} eventos", eventos.size()); eventos.forEach(this::procesarPedido); }
// Múltiples topics @KafkaListener(topics = {"pedidos", "devoluciones"}) public void consumirMultiple(ConsumerRecord<String, Object> record) { log.info("Topic: {}, Mensaje: {}", record.topic(), record.value()); }}Manejo de errores
Section titled “Manejo de errores”@Configurationpublic class KafkaErrorConfig {
@Bean public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) { // Reintentos con backoff exponencial ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0); backOff.setMaxElapsedTime(60000L); // 1 minuto máximo
DefaultErrorHandler handler = new DefaultErrorHandler( new DeadLetterPublishingRecoverer(template), // Enviar a DLT backOff );
// No reintentar ciertos errores handler.addNotRetryableExceptions( ValidationException.class, JsonParseException.class );
return handler; }}
// Consumidor de Dead Letter Topic@KafkaListener(topics = "pedidos.DLT", groupId = "dlt-group")public void procesarDLT(ConsumerRecord<String, Object> record) { log.error("Mensaje fallido: {}", record.value());}🎯 17.4 Eventos de Dominio
Section titled “🎯 17.4 Eventos de Dominio”Eventos con ApplicationEventPublisher
Section titled “Eventos con ApplicationEventPublisher”// Eventopublic record PedidoCreadoEvent( Long pedidoId, Long usuarioId, BigDecimal total, LocalDateTime timestamp) {}
// Publicador@Servicepublic class PedidoService {
private final ApplicationEventPublisher eventPublisher;
@Transactional public Pedido crear(PedidoDTO dto) { Pedido pedido = repository.save(new Pedido(dto));
// Publicar evento interno eventPublisher.publishEvent(new PedidoCreadoEvent( pedido.getId(), pedido.getUsuarioId(), pedido.getTotal(), LocalDateTime.now() ));
return pedido; }}
// Listener@Component@Slf4jpublic class PedidoEventListener {
@EventListener public void onPedidoCreado(PedidoCreadoEvent evento) { log.info("Pedido creado: {}", evento.pedidoId()); // Enviar notificación, actualizar inventario, etc. }
// Asíncrono @Async @EventListener public void onPedidoCreadoAsync(PedidoCreadoEvent evento) { // Procesamiento en otro hilo }
// Transaccional (después del commit) @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void onPedidoCreadoAfterCommit(PedidoCreadoEvent evento) { // Solo se ejecuta si la transacción fue exitosa }}📊 17.5 Patrones de Mensajería
Section titled “📊 17.5 Patrones de Mensajería”Outbox Pattern
Section titled “Outbox Pattern”// Entidad Outbox@Entity@Table(name = "outbox_events")public class OutboxEvent { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id;
private String aggregateType; private String aggregateId; private String eventType; private String payload; private LocalDateTime createdAt; private boolean processed;}
// Servicio con Outbox@Servicepublic class PedidoService {
private final OutboxRepository outboxRepository;
@Transactional public Pedido crear(PedidoDTO dto) { Pedido pedido = repository.save(new Pedido(dto));
// Guardar evento en outbox (misma transacción) OutboxEvent event = OutboxEvent.builder() .aggregateType("Pedido") .aggregateId(pedido.getId().toString()) .eventType("PedidoCreado") .payload(objectMapper.writeValueAsString(pedido)) .createdAt(LocalDateTime.now()) .processed(false) .build();
outboxRepository.save(event); return pedido; }}
// Procesador de Outbox (scheduled)@Component@Slf4jpublic class OutboxProcessor {
@Scheduled(fixedDelay = 1000) @Transactional public void processOutbox() { List<OutboxEvent> events = outboxRepository.findByProcessedFalse();
for (OutboxEvent event : events) { try { kafkaTemplate.send(event.getEventType(), event.getPayload()); event.setProcessed(true); outboxRepository.save(event); } catch (Exception e) { log.error("Error procesando outbox: {}", event.getId(), e); } } }}
🐝