Skip to content

17. Mensajería en Spring

VentajaDescripción
DesacoplamientoServicios independientes
EscalabilidadProcesar mensajes en paralelo
ResilienciaMensajes persistentes
AsincroníaNo bloquea al emisor
CaracterísticaRabbitMQKafka
ModeloMessage QueueEvent Log
PersistenciaOpcionalPor defecto
ThroughputMedio-AltoMuy alto
OrdenPor colaPor partición
Caso de usoTareas, RPCStreaming, logs

Dependencia RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Configuración RabbitMQ
# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
Configuración completa
@Configuration
public class RabbitConfig {
// Cola directa
@Bean
public Queue pedidosQueue() {
return QueueBuilder.durable("pedidos.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dlx.pedidos")
.build();
}
// Exchange directo
@Bean
public DirectExchange pedidosExchange() {
return new DirectExchange("pedidos.exchange");
}
// Binding
@Bean
public Binding pedidosBinding(Queue pedidosQueue, DirectExchange pedidosExchange) {
return BindingBuilder
.bind(pedidosQueue)
.to(pedidosExchange)
.with("pedido.creado");
}
// Exchange de tipo Topic (patrones)
@Bean
public TopicExchange notificacionesExchange() {
return new TopicExchange("notificaciones.exchange");
}
@Bean
public Queue emailQueue() {
return new Queue("notificaciones.email");
}
@Bean
public Queue smsQueue() {
return new Queue("notificaciones.sms");
}
@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange notificacionesExchange) {
return BindingBuilder
.bind(emailQueue)
.to(notificacionesExchange)
.with("notificacion.*.email"); // Patrón
}
// Configurar JSON
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}
Productor
@Service
@Slf4j
public class PedidoService {
private final RabbitTemplate rabbitTemplate;
private final PedidoRepository repository;
public Pedido crear(PedidoDTO dto) {
Pedido pedido = repository.save(new Pedido(dto));
// Publicar evento
PedidoCreadoEvent evento = PedidoCreadoEvent.builder()
.pedidoId(pedido.getId())
.usuarioId(pedido.getUsuarioId())
.total(pedido.getTotal())
.timestamp(LocalDateTime.now())
.build();
rabbitTemplate.convertAndSend(
"pedidos.exchange", // Exchange
"pedido.creado", // Routing key
evento // Mensaje
);
log.info("Evento publicado: {}", evento);
return pedido;
}
}
Consumidor
@Service
@Slf4j
public class NotificacionService {
@RabbitListener(queues = "pedidos.queue")
public void procesarPedido(PedidoCreadoEvent evento) {
log.info("Procesando pedido: {}", evento.getPedidoId());
// Enviar email de confirmación
enviarEmailConfirmacion(evento);
}
// Con acknowledgment manual
@RabbitListener(queues = "pedidos.queue")
public void procesarConAck(PedidoCreadoEvent evento, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
procesarEvento(evento);
channel.basicAck(tag, false); // Confirmar
} catch (Exception e) {
channel.basicNack(tag, false, true); // Reencolar
}
}
// Múltiples colas
@RabbitListener(queues = {"cola1", "cola2"})
public void procesarMultiple(String mensaje) {
// ...
}
}
Dead Letter Queue
@Configuration
public class DLQConfig {
@Bean
public Queue dlqQueue() {
return new Queue("dlq.pedidos");
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Binding dlqBinding() {
return BindingBuilder
.bind(dlqQueue())
.to(dlxExchange())
.with("dlx.pedidos");
}
}
// Consumidor de DLQ
@RabbitListener(queues = "dlq.pedidos")
public void procesarMensajesFallidos(Message message) {
log.error("Mensaje fallido: {}", new String(message.getBody()));
// Guardar en BD, notificar, etc.
}

Dependencia Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Configuración Kafka
# application.properties
spring.kafka.bootstrap-servers=localhost:9092
# Productor
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumidor
spring.kafka.consumer.group-id=mi-grupo
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
Configuración de topics
@Configuration
public class KafkaConfig {
@Bean
public NewTopic pedidosTopic() {
return TopicBuilder.name("pedidos")
.partitions(3)
.replicas(1)
.build();
}
@Bean
public NewTopic notificacionesTopic() {
return TopicBuilder.name("notificaciones")
.partitions(5)
.replicas(1)
.config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7 días
.build();
}
}
Productor Kafka
@Service
@Slf4j
public class PedidoKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void enviarPedido(PedidoCreadoEvent evento) {
// Envío simple
kafkaTemplate.send("pedidos", evento);
// Con key (para ordenamiento por partición)
kafkaTemplate.send("pedidos", evento.getPedidoId().toString(), evento);
// Con callback
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send("pedidos", evento);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Mensaje enviado: offset={}",
result.getRecordMetadata().offset());
} else {
log.error("Error enviando mensaje", ex);
}
});
}
// Envío a partición específica
public void enviarAParticion(String topic, int particion, String key, Object mensaje) {
kafkaTemplate.send(topic, particion, key, mensaje);
}
}
Consumidor Kafka
@Service
@Slf4j
public class PedidoKafkaConsumer {
@KafkaListener(topics = "pedidos", groupId = "pedidos-group")
public void consumir(PedidoCreadoEvent evento) {
log.info("Recibido: {}", evento);
procesarPedido(evento);
}
// Con metadatos
@KafkaListener(topics = "pedidos", groupId = "pedidos-group")
public void consumirConMetadata(
PedidoCreadoEvent evento,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
log.info("Partición: {}, Offset: {}, Timestamp: {}",
partition, offset, timestamp);
procesarPedido(evento);
}
// Batch processing
@KafkaListener(topics = "pedidos", groupId = "batch-group",
containerFactory = "batchFactory")
public void consumirBatch(List<PedidoCreadoEvent> eventos) {
log.info("Procesando {} eventos", eventos.size());
eventos.forEach(this::procesarPedido);
}
// Múltiples topics
@KafkaListener(topics = {"pedidos", "devoluciones"})
public void consumirMultiple(ConsumerRecord<String, Object> record) {
log.info("Topic: {}, Mensaje: {}", record.topic(), record.value());
}
}
Manejo de errores
@Configuration
public class KafkaErrorConfig {
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
// Reintentos con backoff exponencial
ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxElapsedTime(60000L); // 1 minuto máximo
DefaultErrorHandler handler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(template), // Enviar a DLT
backOff
);
// No reintentar ciertos errores
handler.addNotRetryableExceptions(
ValidationException.class,
JsonParseException.class
);
return handler;
}
}
// Consumidor de Dead Letter Topic
@KafkaListener(topics = "pedidos.DLT", groupId = "dlt-group")
public void procesarDLT(ConsumerRecord<String, Object> record) {
log.error("Mensaje fallido: {}", record.value());
}

Eventos de dominio
// Evento
public record PedidoCreadoEvent(
Long pedidoId,
Long usuarioId,
BigDecimal total,
LocalDateTime timestamp
) {}
// Publicador
@Service
public class PedidoService {
private final ApplicationEventPublisher eventPublisher;
@Transactional
public Pedido crear(PedidoDTO dto) {
Pedido pedido = repository.save(new Pedido(dto));
// Publicar evento interno
eventPublisher.publishEvent(new PedidoCreadoEvent(
pedido.getId(),
pedido.getUsuarioId(),
pedido.getTotal(),
LocalDateTime.now()
));
return pedido;
}
}
// Listener
@Component
@Slf4j
public class PedidoEventListener {
@EventListener
public void onPedidoCreado(PedidoCreadoEvent evento) {
log.info("Pedido creado: {}", evento.pedidoId());
// Enviar notificación, actualizar inventario, etc.
}
// Asíncrono
@Async
@EventListener
public void onPedidoCreadoAsync(PedidoCreadoEvent evento) {
// Procesamiento en otro hilo
}
// Transaccional (después del commit)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onPedidoCreadoAfterCommit(PedidoCreadoEvent evento) {
// Solo se ejecuta si la transacción fue exitosa
}
}

Outbox Pattern
// Entidad Outbox
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String aggregateType;
private String aggregateId;
private String eventType;
private String payload;
private LocalDateTime createdAt;
private boolean processed;
}
// Servicio con Outbox
@Service
public class PedidoService {
private final OutboxRepository outboxRepository;
@Transactional
public Pedido crear(PedidoDTO dto) {
Pedido pedido = repository.save(new Pedido(dto));
// Guardar evento en outbox (misma transacción)
OutboxEvent event = OutboxEvent.builder()
.aggregateType("Pedido")
.aggregateId(pedido.getId().toString())
.eventType("PedidoCreado")
.payload(objectMapper.writeValueAsString(pedido))
.createdAt(LocalDateTime.now())
.processed(false)
.build();
outboxRepository.save(event);
return pedido;
}
}
// Procesador de Outbox (scheduled)
@Component
@Slf4j
public class OutboxProcessor {
@Scheduled(fixedDelay = 1000)
@Transactional
public void processOutbox() {
List<OutboxEvent> events = outboxRepository.findByProcessedFalse();
for (OutboxEvent event : events) {
try {
kafkaTemplate.send(event.getEventType(), event.getPayload());
event.setProcessed(true);
outboxRepository.save(event);
} catch (Exception e) {
log.error("Error procesando outbox: {}", event.getId(), e);
}
}
}
}
🐝