您现在的位置是:首页 >技术教程 >整合ES(Elasticsearch)+MQ(RabbitMQ)实现商品上下架/跨模块远程调用网站首页技术教程
整合ES(Elasticsearch)+MQ(RabbitMQ)实现商品上下架/跨模块远程调用
商品上下架过程中,修改数据库表上下架状态,之后通过RabbitMQ发送消息,最终实现ES中数据同步
- nacos服务发现和注册
 - ES面向文档型数据库
 - RabbitMQ
 

ES
- 用户将数据提交到Elasticsearch数据库中
 - 通过分词控制器将对应的语句分词
 - 将其权重和分词结果一并存入数据
 - 当用户搜索数据,在根据权重将结果排名、打分,并将结果呈现给用户
 

 
①、下载对应的ES
 
 
②、下载并安装分词器
 
 
 
③、启动ES(前提是系统配置好了JDK环境变量)
通过浏览器验证是否启动成功
 
④、官方推荐的客户端kibana
 
 
通过浏览器进行访问,通过DevTools工具
 
 
RabbitMQ
#拉取镜像
docker pull rabbitmq:3.8-management
#三个端口号:通信端口号,后台访问端口号,集群端口号
docker run -d --restart=always -p5672:5672 -p 12572:15672 --name rabbitmq rabbitmq:3.8-management
 
管理后台:http://ip:15672

业务模块
一、service-search模块消费者
①、依赖spring-boot-starter-ata-elasticsearch
 远程调用spring-cloud-starter-openfeign
 引入三定义service-client模块坐标
 引入rabbit模块rabbit_util
②、配置文件
server:
	port: 8204
feign:
	sentinel: # 熔断机制
		enabled: true
	client:	
		config:
			default: #配置全局的feign的调用超时时间 如果有指定的服务配置,则默认的配置不会生效
				connectTimeout: 30000 #指定消费者连接服务提供者超时时间
				readTimeout: 50000 #调用服务提供者的服务超时时间
spring:
	main:
		allow-bean-definition-overriding: true#当遇到同样名字,是否允许覆盖注册
	elasticsearch:
		rest:
			uris: http://localhost:9200
	rabbitmq:
		host: 192.168.197.128
		port: 5672
		username: guest
		password: guest
		publisher-confirm-type: CORRELATED #发布确认模式,消息是否被成功发送到交换机
		publisher-returns: true
		listener:
			simple:
				prefetch: 1
				concurrency: 3
				acknowledge-mode: manual #消费端手动确认
	redis:
		host: localhost
		port: 6379
		database: 0
		timeout: 1800000
		password:
		lettuce:
			pool:
				max-active: 20#最大连接数
				max-wait: -1
				max-idle: 5#最大则色等待时间(负数表示没限制)
				min-idle: 0#最小空闲
 
③、启动类(当前模块不需要连接数据库,所以排除。否则需要在application.yml中配置数据库连接)
@SpringBootApplication(exclude=DatasourceAutoConfiguration.class)//取消数据源自动配置
@EnableDiscoveryClient
@EnableFeignClients //远程调用
public class ServiceSearchApplication{
}
 
④、业务接口、服务
@RestController
@RequestMapping("api/search/sku")
public class SkuApiController{
	
	@Autowired
	private SkuService skuService;
	//上架,将数据加入ES中
	@GetMapping("inner/upperSku/{skuId}")
	public Result upperSku(@PathVariable Long skuId){
		skuService.upperSku(skuId);
		return Result.ok(null);
	}
	//下架,将数据从ES中删除
	@GetMapping("inner/lowerSku/{skuId}")
	public Result lowerSku(@PathVariable Long skuId){
		skuService.lowerSku(skuId);
		return Result.ok(null);
	}
}
 
service接口忽略,只创建其是实现类
@Service
public class SkuServiceImpl implements SkuService{
	@Autowired
	private SkuRepository skuRepository;
	//注入远程调用的接口
	@Autowired
	private ProductFeignClient productFeignClient;
	
	Override
	public void upperSku(Long skuId){
		
		//远程调用获取数据
		SkuInfo skuInfo = productFeignClient.getSkuInfo(skuId);
		if(skuInfo == null){
			return;
		}
		Category category = productFeignClient.getCategory(skuInfo.getCategoryId());
		
		SkuEs skuEs = new SkuEs();
		if(category != null){
			skuEs.setCategoryId(category.getId());
			skuEs.setCategoryName(category.getName());
		}
		//封装SKU信息
		skuEs.setId(skuId.getId());
		skuEs.setKeyWord(skuInfogetSkuName()+","+skuEs.getCategoryName());
		skuEs.setWareId(skuInfo.getWareId());
		skuEs.setIsNewPerson(skuInfo.getInsNewPerson());
		skuEs.setImgUrl(skuInfo.getImgUrl());
		skuEs.setTitle(skuInfo.getSkuName());
		if(skuInfo.getSkuTyoe()==SkuType.COMMON.getCode()){
			skuEs.setSkyType(0);
			skuEs.setPrice(skuInfo.getPrice().doubleValue());
			skuEs.setStock(skuInfo.getStock());
			skuEs.setSale(skuInfo.getSale());
			skuEs.setPerLimit(skuInfo.getPerLimit());
		}
		//调用方法添加到ES
		skuRepository.save(skuEs);
	}
	@Override
	public void lowerSku(Long skuId){
		skuRepository.deleteById(skuId);
	}
}
 
依赖中SpringData的模块,通过Spring提供的ElasticsearchRepository操作ES
 泛型是操作的实体类和其主键
public interface SkuRepository extends ElasticsearchRepository<SkuEs,Long>{
}
 
⑤、创建接收rabbitMQ消息的类
@Component
public class SkuReceiver{
	
	@Autowired
	private SkuService skuService;
	//durable=true  表示对消息进行持久化
	@RabbitListener(bindings = @QueueBinding(
		value=@Queue(value=MqConst.QUEUE_GOODS_UPPER,durable="true"),
		exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
		key={MqConst.ROUTING_GOODS_UPPER}
		))
	public void upperSku(Long skuId,Message message,Channel channel){
		
		if(skuId != null){
			//调用方法商品上架
			skuService.upperSku(skuId);
		}
		//配置文件中的acknowledge-mode: manual #消费端手动确认   true表示多个消息
		channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
	}
	//商品下架
	@RabbitListener(bindings = @QueueBinding(
		value=@Queue(value=MqConst.QUEUE_GOODS_LOWER,durable="true"),
		exchange=@Exchange(value=MqConst.EXCHANGE_GOODS_DIRECT),
		key={MqConst.ROUTING_GOODS_LOWER}
		))
	public void lowerSku(Long skuId,Message message,Channel channel){
		if(skuId != null){
			skuService.lowerSku(skuId);
		}
		channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
	}
}
 
二、service-product模块,生产者
依赖导入rabbit-util模块坐标
 配置文件application.yml中添加rabbit的连接信息
①、上架下架,发送消息给rabbitMQ
@ApiOperation("商品上下架")
@GetMapping("/publish/{skuId}/{status}")
public Result publish(@PathVariable Long skuId,@PathVariable Integer status){
	
	skuInfoService.publish(skuId,status);//发送消息给rabbitMQ
	return Result.ok(null);
}
 
//注入MQ
@Autowired
private RabbitService rabbitService;
@Override
public void publish(Long skuId,Integer status){
	if(status == 1){//上架
		SkuInfo skuInfo = baseMapper.selectById(skuId);
		skuInfo.setPublishStatus(status);
		baseMapper.updateById(skuIdFo);
		//上架时,整合mq把数据同步到es中
		rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_UPPER,skuId);
	}else{//下架
		SkuInfo skuInfo = baseMapper.selectById(skuId);
		skuInfo.setPublishStatus(status);
		baseMapper.updateById(skuInfo);
		//下架,整合mq把数据同步到es中
		rabbitService.sendMessage(MqConst.EXCHANGE_GOODS_DIRECT,MqConst.ROUTING_GOODS_LOWER,skuId);
	}
}
 
②、为远程调用提供数据
@RestController
@RequestMapping("/api/product")
public class PruductInnerController{
	@Autowired
	private CategoryService categoryService;
	@Autowired
	private SkuInfoService skuInfoService;
	
	//根据SKUID获取分类信息
	@GetMapping("inner/getCategory/{categoryId}")
	public Category getCategory(@PathVariable Long categoryId){
		Category category = categoryService.getById(categoryId);
		return category;
	}
	//根据SKUID获取SKU信息
	@GetMapping("inner/getSkuInfo/{skuId}")
	public SkuInfo getSkuInfo(@PathVariable Long skuId){
		return skuInfoService.getById(skuId);
	}
}
 
三、service-client模块
定义生产者的接口
@FeignClient(value="service-product") 
pubic interface ProductFeignClient{
	
	@GetMapping("/api/product/inner/getCategory/{categoryId}")
	public Category getCategory(@PathVariable("categoryId") Long categoryId);
	@GetMapping("/api/product/inner/getSkuInfo/{skuId}")
	public SkuInfo getSkuInfo(@PathVariable("skuId") Long skuId);
}
 
三、common工程下创建rabbit-util模块
①、依赖
spring-cloud-starter-bus-amqp
②、封装发送消息的方法
@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rebbitTemplate;
	//发送那个消息的方法
	public boolean sendMessage(String exchange,String routingKey,Object message){
		
		rabbitTemplate.convertAndSend(exchage,routingKey,message); 
		return true;
	}
}
 
由于默认只能发送字符串,因此需要拓展配置类,发送对象转换为json
@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}
 
消息确认

@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback{
	
	@Autowired
	private RabbitTemplate rabbitTemplates;
	@PostConstruct
	public void init(){
		rabbitTemplate.setReturnCallback(this);
		rabbitTemplate.setConfirmCallback(this);
	}
	public void confirm(CorrelationData correlationData){
		if(ack){
			System.out.println("消息发送成功!");
		}else{
			System.out.println("消息发送失败!" + cause);
		}
	}
	@Override
	public void returnedMessage(Message message,int replyCode,String replayText,String exchange,String routingKey){
		System.out.println("消息主题: " + new String(message.getBody()));
		System.out.println("应答码: " + replayCode);
		System.out.println("描述: " + replayText);
		System.out.println("消息使用的交换器 exchange: " + exchange);
		System.out.println("消息使用的路由键 routing: " + routingKey);
	}
}
 
常量(交换机、路由ke、队列的等名称)
public class MqConst{
	
	//消息补偿
	public static final String MQ_KEY_PREFIX = "ssyx.mq:list";
	public static final int RETRY_COUNT = 3;
	//商品上下架
	public static final String EXCHANGE_GOODS_DIRECT= "ssyx.goods.direct";
	public static final String ROUTING_GOODS_UPPER = "ssyx.goods.upper";
	public static final String ROUTING_GOODS_LOWER = "ssyx.goods.lower";
	//队列
	public static final String QUEUE_GOODS_UPPER = "ssyx.goods.upper";
	public static final String QUEUE_GOODS_LOWER = "ssyx.goods.lower";
}
                
            




U8W/U8W-Mini使用与常见问题解决
QT多线程的5种用法,通过使用线程解决UI主界面的耗时操作代码,防止界面卡死。...
stm32使用HAL库配置串口中断收发数据(保姆级教程)
分享几个国内免费的ChatGPT镜像网址(亲测有效)
Allegro16.6差分等长设置及走线总结