commit 16f4c212ea94747f0d3d3f370f9955598bff3cf1 Author: ALEX <2604434353@qq.com> Date: Fri Jul 18 08:52:27 2025 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..5c926e0 --- /dev/null +++ b/pom.xml @@ -0,0 +1,127 @@ + + + 4.0.0 + com.ctgu + pro_eld_mqtt_compare + 0.0.1-SNAPSHOT + pro_eld_mqtt_compare + pro_eld_mqtt_compare + + 1.8 + UTF-8 + UTF-8 + 2.6.13 + + + + org.springframework.boot + spring-boot-starter-web + + + + com.mysql + mysql-connector-j + runtime + + + org.springframework.boot + spring-boot-starter-test + test + + + com.fasterxml.jackson.core + jackson-databind + 2.12.2 + + + com.fasterxml.jackson.core + jackson-annotations + 2.12.2 + + + org.springframework.boot + spring-boot-starter-webflux + 3.3.5 + + + org.apache.logging.log4j + log4j-core + 2.19.0 + + + org.apache.logging.log4j + log4j-slf4j2-impl + 2.19.0 + + + org.projectlombok + lombok + 1.18.32 + provided + + + + org.springframework.integration + spring-integration-mqtt + 5.5.6 + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + + org.springframework.boot + spring-boot-starter-integration + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + UTF-8 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + com.ctgu.pro_eld_mqtt_compare.ProEldMqttCompareApplication + + + + + repackage + + repackage + + + + + + + + diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplication.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplication.java new file mode 100644 index 0000000..f6d91fd --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplication.java @@ -0,0 +1,12 @@ +package com.ctgu.pro_eld_mqtt_compare; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ProEldMqttCompareApplication { + + public static void main(String[] args) { + SpringApplication.run(ProEldMqttCompareApplication.class, args); + } +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/StartupCompareRunner.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/StartupCompareRunner.java new file mode 100644 index 0000000..c5d08b6 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/StartupCompareRunner.java @@ -0,0 +1,32 @@ +package com.ctgu.pro_eld_mqtt_compare; + +import com.ctgu.pro_eld_mqtt_compare.service.CompareService; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * @ClassName ApplicationRunner + * @Author Alex2 + * @Date 2025/7/4 10:32 + **/ +@Component +public class StartupCompareRunner implements ApplicationRunner { + + private final CompareService compareService; + + public StartupCompareRunner(CompareService compareService) { + this.compareService = compareService; + } + + @Override + public void run(ApplicationArguments args) throws Exception { +// compareService.compare(5, "vehicle"); +// compareService.compare(5, "device"); +// compareService.compare(5, "business"); +// compareService.compare(5, "driver"); +// compareService.compare(5, "vehicle_device"); +// compareService.compare(5, "driver_vehicle"); +// compareService.compare(5, "business_driver"); + } +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/ApiService.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/ApiService.java new file mode 100644 index 0000000..155a609 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/ApiService.java @@ -0,0 +1,84 @@ +package com.ctgu.pro_eld_mqtt_compare.api; + +import com.ctgu.pro_eld_mqtt_compare.request.GetInfoVO; +import com.ctgu.pro_eld_mqtt_compare.response.*; +import com.ctgu.pro_eld_mqtt_compare.utils.TAG; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Log4j2 +@Component +public class ApiService { + + @Autowired + WebClient.Builder webClientBuilder; + + private static final Map> tableObjectMapping = new HashMap<>(); + + static { + tableObjectMapping.put(TAG.VEHICLE, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.DEVICE, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.BUSINESS, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.DRIVER, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.VEHICLE_DEVICE, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.DRIVER_VEHICLE, new ParameterizedTypeReference>() { + }); + tableObjectMapping.put(TAG.BUSINESS_DRIVER, new ParameterizedTypeReference>() { + }); + } + + // Pakistan API + @SuppressWarnings("unchecked") + public Mono> getInfoPakistan(GetInfoVO getInfoVO) { + ParameterizedTypeReference> typeRef = (ParameterizedTypeReference>) tableObjectMapping.get(getInfoVO.getTableName()); + if (typeRef == null) { + throw new IllegalArgumentException("不支持该表名: " + getInfoVO.getTableName()); + } + + return webClientBuilder + .baseUrl("http://34.81.61.186") + .build() + .post() + .uri("/api/eldapp/v1/eldData") + .header("eldKey", "anytrek-eld-super-key-2024") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue(getInfoVO) + .retrieve() + .bodyToMono(typeRef); + } + + // Pro API + @SuppressWarnings("unchecked") + public Mono> getInfoPro(Integer customerId, String tableName) { + ParameterizedTypeReference> typeRef = (ParameterizedTypeReference>) tableObjectMapping.get(tableName); + if (typeRef == null) { + throw new IllegalArgumentException("Unsupported table name: " + tableName); + } + + return webClientBuilder.baseUrl("https://eldapp.anytrek.app") + .build() + .get() + .uri(uriBuilder -> uriBuilder + .path("/api/eldapp/v1/eldData") + .queryParam("customerId", customerId) + .queryParam("tableName", tableName) + .build()) + .header("eldKey", "435e833ca0610442d25b8011475c8352") + .retrieve() + .bodyToMono(typeRef); + } +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/WebClientConfig.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/WebClientConfig.java new file mode 100644 index 0000000..67f09a9 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/api/WebClientConfig.java @@ -0,0 +1,43 @@ +package com.ctgu.pro_eld_mqtt_compare.api; + +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +import javax.net.ssl.SSLException; + +@Configuration +public class WebClientConfig { + + @Bean + public WebClient.Builder webClientBuilder() { + // 创建 HttpClient,跳过证书校验 + HttpClient httpClient = HttpClient.create() + .secure(sslContextSpec -> { + try { + sslContextSpec.sslContext( + SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build() + ); + } catch (SSLException e) { + throw new RuntimeException(e); + } + }); + + ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() + .codecs(configurer -> configurer + .defaultCodecs() + .maxInMemorySize(10 * 1024 * 1024)) // 10MB + .build(); + + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .exchangeStrategies(exchangeStrategies); + } +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundConfiguration.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundConfiguration.java new file mode 100644 index 0000000..e3744c2 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundConfiguration.java @@ -0,0 +1,65 @@ +package com.ctgu.pro_eld_mqtt_compare.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.util.UUID; + +@Configuration +public class MqttInboundConfiguration { + + private final MqttInboundProperties mqttProperties; + private final ReceiverMessageHandler receiverMessageHandler; + + @Autowired + public MqttInboundConfiguration(MqttInboundProperties mqttProperties, ReceiverMessageHandler receiverMessageHandler) { + this.mqttProperties = mqttProperties; + this.receiverMessageHandler = receiverMessageHandler; + } + + @Bean + public MqttPahoClientFactory mqttInboundClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.getConnectionOptions().setServerURIs(new String[]{mqttProperties.getUrl()}); + factory.getConnectionOptions().setUserName(mqttProperties.getUsername()); + factory.getConnectionOptions().setPassword(mqttProperties.getPassword().toCharArray()); + factory.getConnectionOptions().setKeepAliveInterval(30); + factory.getConnectionOptions().setAutomaticReconnect(true); + factory.getConnectionOptions().setCleanSession(false); + return factory; + } + + @Bean + public MessageChannel messageInboundChannel() { + return new DirectChannel(); + } + + @Bean + public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter() { + return new MqttPahoMessageDrivenChannelAdapter( + mqttProperties.getUrl(), + mqttProperties.getClientId() + UUID.randomUUID(), + mqttInboundClientFactory(), + mqttProperties.getTopic() + ) {{ + setQos(mqttProperties.getQos()); + setConverter(new DefaultPahoMessageConverter()); + setOutputChannel(messageInboundChannel()); + }}; + } + + @Bean + @ServiceActivator(inputChannel = "messageInboundChannel") + public MessageHandler messageHandler() { + return receiverMessageHandler; + } +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundProperties.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundProperties.java new file mode 100644 index 0000000..b89626f --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttInboundProperties.java @@ -0,0 +1,17 @@ +package com.ctgu.pro_eld_mqtt_compare.mqtt; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "mqtt.inbound") +public class MqttInboundProperties { + private String url; + private String username; + private String password; + private String clientId; + private String topic; + private Integer qos; +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundConfiguration.java new file mode 100644 index 0000000..19742f6 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundConfiguration.java @@ -0,0 +1,55 @@ +package com.ctgu.pro_eld_mqtt_compare.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; + +import java.util.UUID; + +@Configuration +public class MqttOutboundConfiguration { + + private final MqttOutboundProperties mqttProperties; + + @Autowired + public MqttOutboundConfiguration(MqttOutboundProperties mqttProperties) { + this.mqttProperties = mqttProperties; + } + + @Bean + public MqttPahoClientFactory mqttOutboundClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.getConnectionOptions().setServerURIs(new String[]{mqttProperties.getUrl()}); + factory.getConnectionOptions().setUserName(mqttProperties.getUsername()); + factory.getConnectionOptions().setPassword(mqttProperties.getPassword().toCharArray()); + factory.getConnectionOptions().setKeepAliveInterval(30); + factory.getConnectionOptions().setAutomaticReconnect(true); + factory.getConnectionOptions().setCleanSession(false); + return factory; + } + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MqttPahoMessageHandler mqttOutboundHandler() { + MqttPahoMessageHandler handler = new MqttPahoMessageHandler( + mqttProperties.getUrl(), + mqttProperties.getClientId() + UUID.randomUUID(), + mqttOutboundClientFactory() + ); + handler.setDefaultQos(mqttProperties.getQos()); + handler.setDefaultTopic(mqttProperties.getTopic()); + handler.setAsync(true); + return handler; + } +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundProperties.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundProperties.java new file mode 100644 index 0000000..d4d9ea0 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/MqttOutboundProperties.java @@ -0,0 +1,22 @@ +package com.ctgu.pro_eld_mqtt_compare.mqtt; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @ClassName MqttOutboundProperties + * @Author Alex2 + * @Date 2025/7/8 9:45 + **/ +@Data +@Component +@ConfigurationProperties(prefix = "mqtt.outbound") +public class MqttOutboundProperties { + private String url; + private String username; + private String password; + private String clientId; + private String topic; + private Integer qos; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/ReceiverMessageHandler.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/ReceiverMessageHandler.java new file mode 100644 index 0000000..177e4e5 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/mqtt/ReceiverMessageHandler.java @@ -0,0 +1,69 @@ +package com.ctgu.pro_eld_mqtt_compare.mqtt; + + +import com.ctgu.pro_eld_mqtt_compare.request.PublishMQTTVO; +import com.ctgu.pro_eld_mqtt_compare.response.MqttMessageDTO; +import com.ctgu.pro_eld_mqtt_compare.service.CompareService; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.annotation.Publisher; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +@Log4j2 +@Component +public class ReceiverMessageHandler implements MessageHandler { + + private static final String TAG = ReceiverMessageHandler.class.getSimpleName(); + + @Autowired + @Qualifier("mqttOutboundChannel") + private MessageChannel mqttOutboundChannel; + + private final CompareService compareService; + + public ReceiverMessageHandler(CompareService compareService) { + this.compareService = compareService; + } + + @SneakyThrows + @ServiceActivator(inputChannel = "messageInboundChannel") + public void handleMessage(Message message) throws MessagingException { + String mqttMessage = (String) message.getPayload(); + log.info("{} : <----- MQTT message = {}", TAG, mqttMessage); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(mqttMessage); + + // 处理对象类型 + if (rootNode.isObject()) { + MqttMessageDTO messageDTO = objectMapper.treeToValue(rootNode, MqttMessageDTO.class); + PublishMQTTVO compare = compareService.compare(messageDTO.getCustomerId(), messageDTO.getTableName()); + String json = objectMapper.writeValueAsString(compare); + mqttOutboundChannel.send(MessageBuilder.withPayload(json).build()); + log.info("{} : -----> MQTT message = {}", TAG, json); + } else if (rootNode.isArray() && !rootNode.isEmpty() && rootNode.get(0).isInt()) { // 处理数组类型 + List customerIds = objectMapper.readValue(mqttMessage, new TypeReference>() { + }); + String json = objectMapper.writeValueAsString(customerIds); + mqttOutboundChannel.send(MessageBuilder.withPayload(json).build()); + log.info("{} : -----> MQTT message = {}", TAG, json); + } else { + log.warn("收到未知格式的 MQTT 消息: {}", mqttMessage); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/GetInfoVO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/GetInfoVO.java new file mode 100644 index 0000000..a06675e --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/GetInfoVO.java @@ -0,0 +1,19 @@ +package com.ctgu.pro_eld_mqtt_compare.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassName Request + * @Author Alex2 + * @Date 2025/7/4 10:16 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class GetInfoVO { + private Integer customerId; + + private String tableName; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/PublishMQTTVO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/PublishMQTTVO.java new file mode 100644 index 0000000..29e2ded --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/request/PublishMQTTVO.java @@ -0,0 +1,23 @@ +package com.ctgu.pro_eld_mqtt_compare.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Set; + +/** + * @ClassName PublishMQTTVO + * @Author Alex2 + * @Date 2025/7/7 14:05 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PublishMQTTVO { + private Integer customerId; + + private String tableName; + + private Set ids; +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDTO.java new file mode 100644 index 0000000..93b692a --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDTO.java @@ -0,0 +1,16 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +/** + * @ClassName BusinessDTO + * @Author Alex2 + * @Date 2025/7/4 15:18 + **/ +@Data +public class BusinessDTO { + private Integer id; + private String name; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDriverDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDriverDTO.java new file mode 100644 index 0000000..9d54d0d --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/BusinessDriverDTO.java @@ -0,0 +1,17 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +/** + * @ClassName BusinessDriverDTO + * @Author Alex2 + * @Date 2025/7/4 14:45 + **/ +@Data +public class BusinessDriverDTO { + private Integer id; + private Integer businessId; + private Integer driverId; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DeviceDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DeviceDTO.java new file mode 100644 index 0000000..0f2480e --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DeviceDTO.java @@ -0,0 +1,19 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @ClassName DeviceDTO + * @Author Alex2 + * @Date 2025/7/4 15:15 + **/ +@Data +public class DeviceDTO { + private Integer id; + private Long userPackageId; + private Integer isDelete; + private LocalDateTime synTime; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverDTO.java new file mode 100644 index 0000000..0f2f0b8 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverDTO.java @@ -0,0 +1,18 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +/** + * @ClassName DriverDTO + * @Author Alex2 + * @Date 2025/7/4 15:19 + **/ +@Data +public class DriverDTO { + private Integer id; + private String name; + private String email; + private String phone; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverVehicleDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverVehicleDTO.java new file mode 100644 index 0000000..c679c0b --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/DriverVehicleDTO.java @@ -0,0 +1,17 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +/** + * @ClassName ResponseDTO + * @Author Alex2 + * @Date 2025/7/4 10:17 + **/ +@Data +public class DriverVehicleDTO { + private Integer id; + private Integer driverId; + private Integer vehicleId; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/MqttMessageDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/MqttMessageDTO.java new file mode 100644 index 0000000..cf4ca78 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/MqttMessageDTO.java @@ -0,0 +1,19 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @ClassName MqttMessageDTO + * @Author Alex2 + * @Date 2025/7/7 11:12 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MqttMessageDTO { + private Integer customerId; + + private String tableName; +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDTO.java new file mode 100644 index 0000000..65b805c --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDTO.java @@ -0,0 +1,18 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +/** + * @ClassName VehicleDTO + * @Author Alex2 + * @Date 2025/7/4 14:50 + **/ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class VehicleDTO { + private Integer id; + private String name; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDeviceDTO.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDeviceDTO.java new file mode 100644 index 0000000..ae98e5a --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/response/VehicleDeviceDTO.java @@ -0,0 +1,17 @@ +package com.ctgu.pro_eld_mqtt_compare.response; + +import lombok.Data; + +/** + * @ClassName VehicleDeviceDTO + * @Author Alex2 + * @Date 2025/7/4 14:33 + **/ +@Data +public class VehicleDeviceDTO { + private Integer id; + private Integer vehicleId; + private Integer deviceId; + private Integer isDelete; + private Integer customerId; +} diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/service/CompareService.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/service/CompareService.java new file mode 100644 index 0000000..21a1f38 --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/service/CompareService.java @@ -0,0 +1,136 @@ +package com.ctgu.pro_eld_mqtt_compare.service; + +import com.ctgu.pro_eld_mqtt_compare.api.ApiService; +import com.ctgu.pro_eld_mqtt_compare.request.GetInfoVO; +import com.ctgu.pro_eld_mqtt_compare.request.PublishMQTTVO; +import com.ctgu.pro_eld_mqtt_compare.utils.TAG; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @ClassName CompareService + * @Author Alex2 + * @Date 2025/7/4 10:34 + **/ +@Slf4j +@Service +public class CompareService { + + private static ApiService apiService; + + private final ObjectMapper mapper = new ObjectMapper(); + + private static final Map> KEY_FIELDS = new HashMap<>(); + + static { + KEY_FIELDS.put(TAG.VEHICLE, TAG.VEHICLE_FIELDS); + KEY_FIELDS.put(TAG.DEVICE, TAG.DEVICE_FIELDS); + KEY_FIELDS.put(TAG.BUSINESS, TAG.BUSINESS_FIELDS); + KEY_FIELDS.put(TAG.DRIVER, TAG.DRIVER_FIELDS); + KEY_FIELDS.put(TAG.VEHICLE_DEVICE, TAG.VEHICLE_DEVICE_FIELDS); + KEY_FIELDS.put(TAG.DRIVER_VEHICLE, TAG.DRIVER_VEHICLE_FIELDS); + KEY_FIELDS.put(TAG.BUSINESS_DRIVER, TAG.BUSINESS_DRIVER_FIELDS); + } + + @Autowired + public CompareService(ApiService apiService) { + CompareService.apiService = apiService; + } + + // 生成 value 组合 + private static String getValue(Map item, List fieldNames) { + return fieldNames.stream() + .map(field -> String.valueOf(item.get(field))) + .collect(Collectors.joining("-")); + } + + // 获取 value 字段 + private static List getValueField(String tableName) { + List valueFields = KEY_FIELDS.get(tableName); + if (valueFields == null) { + throw new IllegalArgumentException("未配置字段列表: " + tableName); + } + return valueFields; + } + + // 比较 + public PublishMQTTVO compare(Integer customerId, String tableName) throws JsonProcessingException { + // Pakistan API + GetInfoVO driverVehicle = new GetInfoVO(customerId, tableName); + List postResultList = apiService.getInfoPakistan(driverVehicle).block(); + List> pakistanList = mapper.convertValue(postResultList, new TypeReference>>() { + }); + + // Pro API + List getResultList = apiService.getInfoPro(customerId, tableName).block(); + String json2 = mapper.writeValueAsString(getResultList); + List> proList = mapper.readValue(json2, new TypeReference>>() { + }); + return compareList(customerId, tableName, pakistanList, proList); + } + + // 比较列表 +// public void compareList(Integer customerId, String tableName, List> pakistanList, List> proList) { +// List valueFields = getValueField(tableName); +// // 构建 key 集合 +// Set pakistanSet = pakistanList.stream().map(item -> getValue(item, valueFields)).collect(Collectors.toSet()); +// log.info("customerId = {} - tableName = {} - pakistanSet = {}", customerId, tableName, pakistanSet); +// Set proSet = proList.stream().map(item -> getValue(item, valueFields)).collect(Collectors.toSet()); +// log.info("customerId = {} - tableName = {} - proSet = {}", customerId, tableName, proSet); +// +// // 输出所有不同项,key 不同时存在于 pakistanSet 和 proSet 中 +// List> diffList = Stream.concat(pakistanList.stream(), proList.stream()) +// .filter(item -> { +// String key = getValue(item, valueFields); +// return !(pakistanSet.contains(key) && proSet.contains(key)); +// }) +// .collect(Collectors.toList()); +// log.info("customerId = {} - tableName = {} - differList = {}", customerId, tableName, diffList); +// +// if ("vehicle".equals(tableName) || "device".equals(tableName) || "business".equals(tableName) || "driver".equals(tableName)) { +// Set differIds = diffList.stream().map(item -> item.get("id")).collect(Collectors.toSet()); +// log.info("customerId = {} - tableName = {} - differIds = {}", customerId, tableName, differIds); +// }else if ("vehicle_device".equals(tableName) || "driver_vehicle".equals(tableName) || "business_driver".equals(tableName)) { +// +// } +// } + + public PublishMQTTVO compareList(Integer customerId, String tableName, List> pakistanList, List> proList) { + List valueFields = getValueField(tableName); + + Map> pakistanMap = pakistanList.stream() + .collect(Collectors.toMap(item -> getValue(item, valueFields), item -> item,(existing, replacement) -> existing)); + log.info("customerId = {} - tableName = {} - pakistanSet = {}", customerId, tableName, pakistanMap.keySet()); + + Map> proMap = proList.stream() + .collect(Collectors.toMap(item -> getValue(item, valueFields), item -> item,(existing, replacement) -> existing)); + log.info("customerId = {} - tableName = {} - proSet = {}", customerId, tableName, proMap.keySet()); + + // 合并所有 key + Set allKeys = new HashSet<>(); + allKeys.addAll(pakistanMap.keySet()); + allKeys.addAll(proMap.keySet()); + + // 仅保留出现在一侧的 key(即差异 key) + List> diffList = allKeys.stream() + .filter(key -> !(pakistanMap.containsKey(key) && proMap.containsKey(key))) + .map(key -> pakistanMap.getOrDefault(key, proMap.get(key))) // 取出差异项 + .collect(Collectors.toList()); + + log.info("customerId = {} - tableName = {} - differList = {}", customerId, tableName, diffList); + + Set differIds = diffList.stream() + .map(item -> String.valueOf(item.get("id"))) + .map(Integer::valueOf) + .collect(Collectors.toSet()); + log.info("customerId = {} - tableName = {} - differIds = {}", customerId, tableName, differIds); + return new PublishMQTTVO(customerId, tableName, differIds); + } +} \ No newline at end of file diff --git a/src/main/java/com/ctgu/pro_eld_mqtt_compare/utils/TAG.java b/src/main/java/com/ctgu/pro_eld_mqtt_compare/utils/TAG.java new file mode 100644 index 0000000..2e863eb --- /dev/null +++ b/src/main/java/com/ctgu/pro_eld_mqtt_compare/utils/TAG.java @@ -0,0 +1,27 @@ +package com.ctgu.pro_eld_mqtt_compare.utils; + +import java.util.Arrays; +import java.util.List; + +/** + * @ClassName TAG + * @Author Alex2 + * @Date 2025/7/8 8:53 + **/ +public abstract class TAG { + public static final String VEHICLE = "vehicle"; + public static final String DEVICE = "device"; + public static final String BUSINESS = "business"; + public static final String DRIVER = "driver"; + public static final String VEHICLE_DEVICE = "vehicle_device"; + public static final String DRIVER_VEHICLE = "driver_vehicle"; + public static final String BUSINESS_DRIVER = "business_driver"; + + public static final List VEHICLE_FIELDS = Arrays.asList("id", "name", "isDelete", "customerId"); + public static final List DEVICE_FIELDS = Arrays.asList("id", "userPackageId", "isDelete", "synTime", "customerId"); + public static final List BUSINESS_FIELDS = Arrays.asList("id", "name", "isDelete", "customerId"); + public static final List DRIVER_FIELDS = Arrays.asList("id", "name", "isDelete", "email", "phone", "customerId"); + public static final List VEHICLE_DEVICE_FIELDS = Arrays.asList("id", "vehicleId", "deviceId", "isDelete", "customerId"); + public static final List DRIVER_VEHICLE_FIELDS = Arrays.asList("id", "driverId", "vehicleId", "isDelete", "customerId"); + public static final List BUSINESS_DRIVER_FIELDS = Arrays.asList("id", "businessId", "driverId", "isDelete", "customerId"); +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml new file mode 100644 index 0000000..0982fa7 --- /dev/null +++ b/src/main/resources/application.yaml @@ -0,0 +1,17 @@ +server: + port: 8889 +mqtt: + inbound: + url: tcp://35.223.79.247:1883 + username: derek123 + password: password123 + clientId: eld-mqtt + topic: eldData + qos: 0 + outbound: + url: tcp://35.223.79.247:1883 + username: derek123 + password: password123 + clientId: eld-mqtt + topic: mqtt-anyfleet-to-eldapp + qos: 0 \ No newline at end of file diff --git a/src/test/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplicationTests.java b/src/test/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplicationTests.java new file mode 100644 index 0000000..dd9c86b --- /dev/null +++ b/src/test/java/com/ctgu/pro_eld_mqtt_compare/ProEldMqttCompareApplicationTests.java @@ -0,0 +1,13 @@ +package com.ctgu.pro_eld_mqtt_compare; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ProEldMqttCompareApplicationTests { + + @Test + void contextLoads() { + } + +}