first commit
This commit is contained in:
33
.gitignore
vendored
Normal file
33
.gitignore
vendored
Normal file
@ -0,0 +1,33 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
127
pom.xml
Normal file
127
pom.xml
Normal file
@ -0,0 +1,127 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.ctgu</groupId>
|
||||
<artifactId>pro_eld_mqtt_compare</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>pro_eld_mqtt_compare</name>
|
||||
<description>pro_eld_mqtt_compare</description>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<spring-boot.version>2.6.13</spring-boot.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.mysql</groupId>
|
||||
<artifactId>mysql-connector-j</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.12.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>2.12.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
<version>3.3.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.19.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j2-impl</artifactId>
|
||||
<version>2.19.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.32</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<!-- Spring Integration MQTT -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-mqtt</artifactId>
|
||||
<version>5.5.6</version> <!-- 根据需要选择合适的版本 -->
|
||||
</dependency>
|
||||
|
||||
<!-- Paho MQTT Client -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.paho</groupId>
|
||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>1.2.5</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Spring Boot Starter Integration -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-integration</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
<encoding>UTF-8</encoding>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<configuration>
|
||||
<mainClass>com.ctgu.pro_eld_mqtt_compare.ProEldMqttCompareApplication</mainClass>
|
||||
<!-- <skip>true</skip>-->
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>repackage</id>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,12 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ProEldMqttCompareApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ProEldMqttCompareApplication.class, args);
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare;
|
||||
|
||||
import com.ctgu.pro_eld_mqtt_compare.service.CompareService;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @ClassName ApplicationRunner
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 10:32
|
||||
**/
|
||||
@Component
|
||||
public class StartupCompareRunner implements ApplicationRunner {
|
||||
|
||||
private final CompareService compareService;
|
||||
|
||||
public StartupCompareRunner(CompareService compareService) {
|
||||
this.compareService = compareService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
// compareService.compare(5, "vehicle");
|
||||
// compareService.compare(5, "device");
|
||||
// compareService.compare(5, "business");
|
||||
// compareService.compare(5, "driver");
|
||||
// compareService.compare(5, "vehicle_device");
|
||||
// compareService.compare(5, "driver_vehicle");
|
||||
// compareService.compare(5, "business_driver");
|
||||
}
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.api;
|
||||
|
||||
import com.ctgu.pro_eld_mqtt_compare.request.GetInfoVO;
|
||||
import com.ctgu.pro_eld_mqtt_compare.response.*;
|
||||
import com.ctgu.pro_eld_mqtt_compare.utils.TAG;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class ApiService {
|
||||
|
||||
@Autowired
|
||||
WebClient.Builder webClientBuilder;
|
||||
|
||||
private static final Map<String, ParameterizedTypeReference<?>> tableObjectMapping = new HashMap<>();
|
||||
|
||||
static {
|
||||
tableObjectMapping.put(TAG.VEHICLE, new ParameterizedTypeReference<List<VehicleDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.DEVICE, new ParameterizedTypeReference<List<DeviceDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.BUSINESS, new ParameterizedTypeReference<List<BusinessDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.DRIVER, new ParameterizedTypeReference<List<DriverDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.VEHICLE_DEVICE, new ParameterizedTypeReference<List<VehicleDeviceDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.DRIVER_VEHICLE, new ParameterizedTypeReference<List<DriverVehicleDTO>>() {
|
||||
});
|
||||
tableObjectMapping.put(TAG.BUSINESS_DRIVER, new ParameterizedTypeReference<List<BusinessDriverDTO>>() {
|
||||
});
|
||||
}
|
||||
|
||||
// Pakistan API
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> Mono<List<T>> getInfoPakistan(GetInfoVO getInfoVO) {
|
||||
ParameterizedTypeReference<List<T>> typeRef = (ParameterizedTypeReference<List<T>>) tableObjectMapping.get(getInfoVO.getTableName());
|
||||
if (typeRef == null) {
|
||||
throw new IllegalArgumentException("不支持该表名: " + getInfoVO.getTableName());
|
||||
}
|
||||
|
||||
return webClientBuilder
|
||||
.baseUrl("http://34.81.61.186")
|
||||
.build()
|
||||
.post()
|
||||
.uri("/api/eldapp/v1/eldData")
|
||||
.header("eldKey", "anytrek-eld-super-key-2024")
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(getInfoVO)
|
||||
.retrieve()
|
||||
.bodyToMono(typeRef);
|
||||
}
|
||||
|
||||
// Pro API
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> Mono<List<T>> getInfoPro(Integer customerId, String tableName) {
|
||||
ParameterizedTypeReference<List<T>> typeRef = (ParameterizedTypeReference<List<T>>) tableObjectMapping.get(tableName);
|
||||
if (typeRef == null) {
|
||||
throw new IllegalArgumentException("Unsupported table name: " + tableName);
|
||||
}
|
||||
|
||||
return webClientBuilder.baseUrl("https://eldapp.anytrek.app")
|
||||
.build()
|
||||
.get()
|
||||
.uri(uriBuilder -> uriBuilder
|
||||
.path("/api/eldapp/v1/eldData")
|
||||
.queryParam("customerId", customerId)
|
||||
.queryParam("tableName", tableName)
|
||||
.build())
|
||||
.header("eldKey", "435e833ca0610442d25b8011475c8352")
|
||||
.retrieve()
|
||||
.bodyToMono(typeRef);
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.api;
|
||||
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.web.reactive.function.client.ExchangeStrategies;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
|
||||
@Configuration
|
||||
public class WebClientConfig {
|
||||
|
||||
@Bean
|
||||
public WebClient.Builder webClientBuilder() {
|
||||
// 创建 HttpClient,跳过证书校验
|
||||
HttpClient httpClient = HttpClient.create()
|
||||
.secure(sslContextSpec -> {
|
||||
try {
|
||||
sslContextSpec.sslContext(
|
||||
SslContextBuilder.forClient()
|
||||
.trustManager(InsecureTrustManagerFactory.INSTANCE)
|
||||
.build()
|
||||
);
|
||||
} catch (SSLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
|
||||
.codecs(configurer -> configurer
|
||||
.defaultCodecs()
|
||||
.maxInMemorySize(10 * 1024 * 1024)) // 10MB
|
||||
.build();
|
||||
|
||||
return WebClient.builder()
|
||||
.clientConnector(new ReactorClientHttpConnector(httpClient))
|
||||
.exchangeStrategies(exchangeStrategies);
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.mqtt;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Configuration
|
||||
public class MqttInboundConfiguration {
|
||||
|
||||
private final MqttInboundProperties mqttProperties;
|
||||
private final ReceiverMessageHandler receiverMessageHandler;
|
||||
|
||||
@Autowired
|
||||
public MqttInboundConfiguration(MqttInboundProperties mqttProperties, ReceiverMessageHandler receiverMessageHandler) {
|
||||
this.mqttProperties = mqttProperties;
|
||||
this.receiverMessageHandler = receiverMessageHandler;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MqttPahoClientFactory mqttInboundClientFactory() {
|
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||
factory.getConnectionOptions().setServerURIs(new String[]{mqttProperties.getUrl()});
|
||||
factory.getConnectionOptions().setUserName(mqttProperties.getUsername());
|
||||
factory.getConnectionOptions().setPassword(mqttProperties.getPassword().toCharArray());
|
||||
factory.getConnectionOptions().setKeepAliveInterval(30);
|
||||
factory.getConnectionOptions().setAutomaticReconnect(true);
|
||||
factory.getConnectionOptions().setCleanSession(false);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageChannel messageInboundChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter() {
|
||||
return new MqttPahoMessageDrivenChannelAdapter(
|
||||
mqttProperties.getUrl(),
|
||||
mqttProperties.getClientId() + UUID.randomUUID(),
|
||||
mqttInboundClientFactory(),
|
||||
mqttProperties.getTopic()
|
||||
) {{
|
||||
setQos(mqttProperties.getQos());
|
||||
setConverter(new DefaultPahoMessageConverter());
|
||||
setOutputChannel(messageInboundChannel());
|
||||
}};
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ServiceActivator(inputChannel = "messageInboundChannel")
|
||||
public MessageHandler messageHandler() {
|
||||
return receiverMessageHandler;
|
||||
}
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.mqtt;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "mqtt.inbound")
|
||||
public class MqttInboundProperties {
|
||||
private String url;
|
||||
private String username;
|
||||
private String password;
|
||||
private String clientId;
|
||||
private String topic;
|
||||
private Integer qos;
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.mqtt;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
||||
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Configuration
|
||||
public class MqttOutboundConfiguration {
|
||||
|
||||
private final MqttOutboundProperties mqttProperties;
|
||||
|
||||
@Autowired
|
||||
public MqttOutboundConfiguration(MqttOutboundProperties mqttProperties) {
|
||||
this.mqttProperties = mqttProperties;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MqttPahoClientFactory mqttOutboundClientFactory() {
|
||||
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
||||
factory.getConnectionOptions().setServerURIs(new String[]{mqttProperties.getUrl()});
|
||||
factory.getConnectionOptions().setUserName(mqttProperties.getUsername());
|
||||
factory.getConnectionOptions().setPassword(mqttProperties.getPassword().toCharArray());
|
||||
factory.getConnectionOptions().setKeepAliveInterval(30);
|
||||
factory.getConnectionOptions().setAutomaticReconnect(true);
|
||||
factory.getConnectionOptions().setCleanSession(false);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageChannel mqttOutboundChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ServiceActivator(inputChannel = "mqttOutboundChannel")
|
||||
public MqttPahoMessageHandler mqttOutboundHandler() {
|
||||
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
|
||||
mqttProperties.getUrl(),
|
||||
mqttProperties.getClientId() + UUID.randomUUID(),
|
||||
mqttOutboundClientFactory()
|
||||
);
|
||||
handler.setDefaultQos(mqttProperties.getQos());
|
||||
handler.setDefaultTopic(mqttProperties.getTopic());
|
||||
handler.setAsync(true);
|
||||
return handler;
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.mqtt;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @ClassName MqttOutboundProperties
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/8 9:45
|
||||
**/
|
||||
@Data
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "mqtt.outbound")
|
||||
public class MqttOutboundProperties {
|
||||
private String url;
|
||||
private String username;
|
||||
private String password;
|
||||
private String clientId;
|
||||
private String topic;
|
||||
private Integer qos;
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.mqtt;
|
||||
|
||||
|
||||
import com.ctgu.pro_eld_mqtt_compare.request.PublishMQTTVO;
|
||||
import com.ctgu.pro_eld_mqtt_compare.response.MqttMessageDTO;
|
||||
import com.ctgu.pro_eld_mqtt_compare.service.CompareService;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.integration.annotation.Publisher;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
public class ReceiverMessageHandler implements MessageHandler {
|
||||
|
||||
private static final String TAG = ReceiverMessageHandler.class.getSimpleName();
|
||||
|
||||
@Autowired
|
||||
@Qualifier("mqttOutboundChannel")
|
||||
private MessageChannel mqttOutboundChannel;
|
||||
|
||||
private final CompareService compareService;
|
||||
|
||||
public ReceiverMessageHandler(CompareService compareService) {
|
||||
this.compareService = compareService;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@ServiceActivator(inputChannel = "messageInboundChannel")
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
String mqttMessage = (String) message.getPayload();
|
||||
log.info("{} : <----- MQTT message = {}", TAG, mqttMessage);
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
JsonNode rootNode = objectMapper.readTree(mqttMessage);
|
||||
|
||||
// 处理对象类型
|
||||
if (rootNode.isObject()) {
|
||||
MqttMessageDTO messageDTO = objectMapper.treeToValue(rootNode, MqttMessageDTO.class);
|
||||
PublishMQTTVO compare = compareService.compare(messageDTO.getCustomerId(), messageDTO.getTableName());
|
||||
String json = objectMapper.writeValueAsString(compare);
|
||||
mqttOutboundChannel.send(MessageBuilder.withPayload(json).build());
|
||||
log.info("{} : -----> MQTT message = {}", TAG, json);
|
||||
} else if (rootNode.isArray() && !rootNode.isEmpty() && rootNode.get(0).isInt()) { // 处理数组类型
|
||||
List<Integer> customerIds = objectMapper.readValue(mqttMessage, new TypeReference<List<Integer>>() {
|
||||
});
|
||||
String json = objectMapper.writeValueAsString(customerIds);
|
||||
mqttOutboundChannel.send(MessageBuilder.withPayload(json).build());
|
||||
log.info("{} : -----> MQTT message = {}", TAG, json);
|
||||
} else {
|
||||
log.warn("收到未知格式的 MQTT 消息: {}", mqttMessage);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.request;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @ClassName Request
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 10:16
|
||||
**/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class GetInfoVO {
|
||||
private Integer customerId;
|
||||
|
||||
private String tableName;
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.request;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @ClassName PublishMQTTVO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/7 14:05
|
||||
**/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class PublishMQTTVO {
|
||||
private Integer customerId;
|
||||
|
||||
private String tableName;
|
||||
|
||||
private Set<Integer> ids;
|
||||
}
|
@ -0,0 +1,16 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName BusinessDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 15:18
|
||||
**/
|
||||
@Data
|
||||
public class BusinessDTO {
|
||||
private Integer id;
|
||||
private String name;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName BusinessDriverDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 14:45
|
||||
**/
|
||||
@Data
|
||||
public class BusinessDriverDTO {
|
||||
private Integer id;
|
||||
private Integer businessId;
|
||||
private Integer driverId;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* @ClassName DeviceDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 15:15
|
||||
**/
|
||||
@Data
|
||||
public class DeviceDTO {
|
||||
private Integer id;
|
||||
private Long userPackageId;
|
||||
private Integer isDelete;
|
||||
private LocalDateTime synTime;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName DriverDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 15:19
|
||||
**/
|
||||
@Data
|
||||
public class DriverDTO {
|
||||
private Integer id;
|
||||
private String name;
|
||||
private String email;
|
||||
private String phone;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName ResponseDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 10:17
|
||||
**/
|
||||
@Data
|
||||
public class DriverVehicleDTO {
|
||||
private Integer id;
|
||||
private Integer driverId;
|
||||
private Integer vehicleId;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @ClassName MqttMessageDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/7 11:12
|
||||
**/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MqttMessageDTO {
|
||||
private Integer customerId;
|
||||
|
||||
private String tableName;
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName VehicleDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 14:50
|
||||
**/
|
||||
@Data
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class VehicleDTO {
|
||||
private Integer id;
|
||||
private String name;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.response;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @ClassName VehicleDeviceDTO
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 14:33
|
||||
**/
|
||||
@Data
|
||||
public class VehicleDeviceDTO {
|
||||
private Integer id;
|
||||
private Integer vehicleId;
|
||||
private Integer deviceId;
|
||||
private Integer isDelete;
|
||||
private Integer customerId;
|
||||
}
|
@ -0,0 +1,136 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.service;
|
||||
|
||||
import com.ctgu.pro_eld_mqtt_compare.api.ApiService;
|
||||
import com.ctgu.pro_eld_mqtt_compare.request.GetInfoVO;
|
||||
import com.ctgu.pro_eld_mqtt_compare.request.PublishMQTTVO;
|
||||
import com.ctgu.pro_eld_mqtt_compare.utils.TAG;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* @ClassName CompareService
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/4 10:34
|
||||
**/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CompareService {
|
||||
|
||||
private static ApiService apiService;
|
||||
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private static final Map<String, List<String>> KEY_FIELDS = new HashMap<>();
|
||||
|
||||
static {
|
||||
KEY_FIELDS.put(TAG.VEHICLE, TAG.VEHICLE_FIELDS);
|
||||
KEY_FIELDS.put(TAG.DEVICE, TAG.DEVICE_FIELDS);
|
||||
KEY_FIELDS.put(TAG.BUSINESS, TAG.BUSINESS_FIELDS);
|
||||
KEY_FIELDS.put(TAG.DRIVER, TAG.DRIVER_FIELDS);
|
||||
KEY_FIELDS.put(TAG.VEHICLE_DEVICE, TAG.VEHICLE_DEVICE_FIELDS);
|
||||
KEY_FIELDS.put(TAG.DRIVER_VEHICLE, TAG.DRIVER_VEHICLE_FIELDS);
|
||||
KEY_FIELDS.put(TAG.BUSINESS_DRIVER, TAG.BUSINESS_DRIVER_FIELDS);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public CompareService(ApiService apiService) {
|
||||
CompareService.apiService = apiService;
|
||||
}
|
||||
|
||||
// 生成 value 组合
|
||||
private static String getValue(Map<String, Object> item, List<String> fieldNames) {
|
||||
return fieldNames.stream()
|
||||
.map(field -> String.valueOf(item.get(field)))
|
||||
.collect(Collectors.joining("-"));
|
||||
}
|
||||
|
||||
// 获取 value 字段
|
||||
private static List<String> getValueField(String tableName) {
|
||||
List<String> valueFields = KEY_FIELDS.get(tableName);
|
||||
if (valueFields == null) {
|
||||
throw new IllegalArgumentException("未配置字段列表: " + tableName);
|
||||
}
|
||||
return valueFields;
|
||||
}
|
||||
|
||||
// 比较
|
||||
public PublishMQTTVO compare(Integer customerId, String tableName) throws JsonProcessingException {
|
||||
// Pakistan API
|
||||
GetInfoVO driverVehicle = new GetInfoVO(customerId, tableName);
|
||||
List<Object> postResultList = apiService.getInfoPakistan(driverVehicle).block();
|
||||
List<Map<String, Object>> pakistanList = mapper.convertValue(postResultList, new TypeReference<List<Map<String, Object>>>() {
|
||||
});
|
||||
|
||||
// Pro API
|
||||
List<Object> getResultList = apiService.getInfoPro(customerId, tableName).block();
|
||||
String json2 = mapper.writeValueAsString(getResultList);
|
||||
List<Map<String, Object>> proList = mapper.readValue(json2, new TypeReference<List<Map<String, Object>>>() {
|
||||
});
|
||||
return compareList(customerId, tableName, pakistanList, proList);
|
||||
}
|
||||
|
||||
// 比较列表
|
||||
// public void compareList(Integer customerId, String tableName, List<Map<String, Object>> pakistanList, List<Map<String, Object>> proList) {
|
||||
// List<String> valueFields = getValueField(tableName);
|
||||
// // 构建 key 集合
|
||||
// Set<String> pakistanSet = pakistanList.stream().map(item -> getValue(item, valueFields)).collect(Collectors.toSet());
|
||||
// log.info("customerId = {} - tableName = {} - pakistanSet = {}", customerId, tableName, pakistanSet);
|
||||
// Set<String> proSet = proList.stream().map(item -> getValue(item, valueFields)).collect(Collectors.toSet());
|
||||
// log.info("customerId = {} - tableName = {} - proSet = {}", customerId, tableName, proSet);
|
||||
//
|
||||
// // 输出所有不同项,key 不同时存在于 pakistanSet 和 proSet 中
|
||||
// List<Map<String, Object>> diffList = Stream.concat(pakistanList.stream(), proList.stream())
|
||||
// .filter(item -> {
|
||||
// String key = getValue(item, valueFields);
|
||||
// return !(pakistanSet.contains(key) && proSet.contains(key));
|
||||
// })
|
||||
// .collect(Collectors.toList());
|
||||
// log.info("customerId = {} - tableName = {} - differList = {}", customerId, tableName, diffList);
|
||||
//
|
||||
// if ("vehicle".equals(tableName) || "device".equals(tableName) || "business".equals(tableName) || "driver".equals(tableName)) {
|
||||
// Set<Object> differIds = diffList.stream().map(item -> item.get("id")).collect(Collectors.toSet());
|
||||
// log.info("customerId = {} - tableName = {} - differIds = {}", customerId, tableName, differIds);
|
||||
// }else if ("vehicle_device".equals(tableName) || "driver_vehicle".equals(tableName) || "business_driver".equals(tableName)) {
|
||||
//
|
||||
// }
|
||||
// }
|
||||
|
||||
public PublishMQTTVO compareList(Integer customerId, String tableName, List<Map<String, Object>> pakistanList, List<Map<String, Object>> proList) {
|
||||
List<String> valueFields = getValueField(tableName);
|
||||
|
||||
Map<String, Map<String, Object>> pakistanMap = pakistanList.stream()
|
||||
.collect(Collectors.toMap(item -> getValue(item, valueFields), item -> item,(existing, replacement) -> existing));
|
||||
log.info("customerId = {} - tableName = {} - pakistanSet = {}", customerId, tableName, pakistanMap.keySet());
|
||||
|
||||
Map<String, Map<String, Object>> proMap = proList.stream()
|
||||
.collect(Collectors.toMap(item -> getValue(item, valueFields), item -> item,(existing, replacement) -> existing));
|
||||
log.info("customerId = {} - tableName = {} - proSet = {}", customerId, tableName, proMap.keySet());
|
||||
|
||||
// 合并所有 key
|
||||
Set<String> allKeys = new HashSet<>();
|
||||
allKeys.addAll(pakistanMap.keySet());
|
||||
allKeys.addAll(proMap.keySet());
|
||||
|
||||
// 仅保留出现在一侧的 key(即差异 key)
|
||||
List<Map<String, Object>> diffList = allKeys.stream()
|
||||
.filter(key -> !(pakistanMap.containsKey(key) && proMap.containsKey(key)))
|
||||
.map(key -> pakistanMap.getOrDefault(key, proMap.get(key))) // 取出差异项
|
||||
.collect(Collectors.toList());
|
||||
|
||||
log.info("customerId = {} - tableName = {} - differList = {}", customerId, tableName, diffList);
|
||||
|
||||
Set<Integer> differIds = diffList.stream()
|
||||
.map(item -> String.valueOf(item.get("id")))
|
||||
.map(Integer::valueOf)
|
||||
.collect(Collectors.toSet());
|
||||
log.info("customerId = {} - tableName = {} - differIds = {}", customerId, tableName, differIds);
|
||||
return new PublishMQTTVO(customerId, tableName, differIds);
|
||||
}
|
||||
}
|
27
src/main/java/com/ctgu/pro_eld_mqtt_compare/utils/TAG.java
Normal file
27
src/main/java/com/ctgu/pro_eld_mqtt_compare/utils/TAG.java
Normal file
@ -0,0 +1,27 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare.utils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @ClassName TAG
|
||||
* @Author Alex2
|
||||
* @Date 2025/7/8 8:53
|
||||
**/
|
||||
public abstract class TAG {
|
||||
public static final String VEHICLE = "vehicle";
|
||||
public static final String DEVICE = "device";
|
||||
public static final String BUSINESS = "business";
|
||||
public static final String DRIVER = "driver";
|
||||
public static final String VEHICLE_DEVICE = "vehicle_device";
|
||||
public static final String DRIVER_VEHICLE = "driver_vehicle";
|
||||
public static final String BUSINESS_DRIVER = "business_driver";
|
||||
|
||||
public static final List<String> VEHICLE_FIELDS = Arrays.asList("id", "name", "isDelete", "customerId");
|
||||
public static final List<String> DEVICE_FIELDS = Arrays.asList("id", "userPackageId", "isDelete", "synTime", "customerId");
|
||||
public static final List<String> BUSINESS_FIELDS = Arrays.asList("id", "name", "isDelete", "customerId");
|
||||
public static final List<String> DRIVER_FIELDS = Arrays.asList("id", "name", "isDelete", "email", "phone", "customerId");
|
||||
public static final List<String> VEHICLE_DEVICE_FIELDS = Arrays.asList("id", "vehicleId", "deviceId", "isDelete", "customerId");
|
||||
public static final List<String> DRIVER_VEHICLE_FIELDS = Arrays.asList("id", "driverId", "vehicleId", "isDelete", "customerId");
|
||||
public static final List<String> BUSINESS_DRIVER_FIELDS = Arrays.asList("id", "businessId", "driverId", "isDelete", "customerId");
|
||||
}
|
17
src/main/resources/application.yaml
Normal file
17
src/main/resources/application.yaml
Normal file
@ -0,0 +1,17 @@
|
||||
server:
|
||||
port: 8889
|
||||
mqtt:
|
||||
inbound:
|
||||
url: tcp://35.223.79.247:1883
|
||||
username: derek123
|
||||
password: password123
|
||||
clientId: eld-mqtt
|
||||
topic: eldData
|
||||
qos: 0
|
||||
outbound:
|
||||
url: tcp://35.223.79.247:1883
|
||||
username: derek123
|
||||
password: password123
|
||||
clientId: eld-mqtt
|
||||
topic: mqtt-anyfleet-to-eldapp
|
||||
qos: 0
|
@ -0,0 +1,13 @@
|
||||
package com.ctgu.pro_eld_mqtt_compare;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@SpringBootTest
|
||||
class ProEldMqttCompareApplicationTests {
|
||||
|
||||
@Test
|
||||
void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user