From 801dcdb463faaf84538bbd2e0b209c3d21737a5b Mon Sep 17 00:00:00 2001 From: Danny Lau Date: Sat, 25 May 2024 00:22:01 +0800 Subject: [PATCH 1/2] #1214 Only take files that are good for processing --- .../software/SPDF/config/AppConfig.java | 26 +++ .../pipeline/PipelineDirectoryProcessor.java | 19 +- .../software/SPDF/utils/FileMonitor.java | 162 ++++++++++++++++++ 3 files changed, 202 insertions(+), 5 deletions(-) create mode 100644 src/main/java/stirling/software/SPDF/utils/FileMonitor.java diff --git a/src/main/java/stirling/software/SPDF/config/AppConfig.java b/src/main/java/stirling/software/SPDF/config/AppConfig.java index 16618e1e..3723e4f8 100644 --- a/src/main/java/stirling/software/SPDF/config/AppConfig.java +++ b/src/main/java/stirling/software/SPDF/config/AppConfig.java @@ -2,8 +2,10 @@ package stirling.software.SPDF.config; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Properties; +import java.util.function.Predicate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; @@ -108,4 +110,28 @@ public class AppConfig { public boolean missingActivSecurity() { return false; } + + @Bean(name = "watchedFoldersDir") + public String watchedFoldersDir() { + return "./pipeline/watchedFolders/"; + } + + @Bean(name = "finishedFoldersDir") + public String finishedFoldersDir() { + return "./pipeline/finishedFolders/"; + } + + @Bean(name = "directoryFilter") + public Predicate processPDFOnlyFilter() { + return path -> { + if (Files.isDirectory(path)) { + return !path.toString() + .contains( + "processing"); + } else { + String fileName = path.getFileName().toString(); + return fileName.endsWith(".pdf"); + } + }; + } } diff --git a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java index c61b29e9..ce7e1b94 100644 --- a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java +++ b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java @@ -19,6 +19,7 @@ import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.Resource; import org.springframework.scheduling.annotation.Scheduled; @@ -28,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import stirling.software.SPDF.model.PipelineConfig; import stirling.software.SPDF.model.PipelineOperation; +import stirling.software.SPDF.utils.FileMonitor; @Service public class PipelineDirectoryProcessor { @@ -35,11 +37,18 @@ public class PipelineDirectoryProcessor { private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class); @Autowired private ObjectMapper objectMapper; @Autowired private ApiDocService apiDocService; - - final String watchedFoldersDir = "./pipeline/watchedFolders/"; - final String finishedFoldersDir = "./pipeline/finishedFolders/"; - @Autowired PipelineProcessor processor; + @Autowired FileMonitor fileMonitor; + + final String watchedFoldersDir; + final String finishedFoldersDir; + + public PipelineDirectoryProcessor( + @Qualifier("watchedFoldersDir") String watchedFoldersDir, + @Qualifier("finishedFoldersDir") String finishedFoldersDir) { + this.watchedFoldersDir = watchedFoldersDir; + this.finishedFoldersDir = finishedFoldersDir; + } @Scheduled(fixedRate = 60000) public void scanFolders() { @@ -130,7 +139,7 @@ public class PipelineDirectoryProcessor { throws IOException { try (Stream paths = Files.list(dir)) { if ("automated".equals(operation.getParameters().get("fileInput"))) { - return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile)) + return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path)) .map(Path::toFile) .toArray(File[]::new); } else { diff --git a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java new file mode 100644 index 00000000..feff9c16 --- /dev/null +++ b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java @@ -0,0 +1,162 @@ +package stirling.software.SPDF.utils; + +import static java.nio.file.StandardWatchEventKinds.*; + +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class FileMonitor { + private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class); + private final Map path2KeyMapping; + private final Set newlyDiscoveredFiles; + private final ConcurrentHashMap.KeySetView readyForProcessingFiles; + private final WatchService watchService; + private final Predicate pathFilter; + private Set stagingFiles; + + /** + * @param rootDirectory the root directory to monitor + * @param pathFilter the filter to apply to the paths, return true if the path should be + * monitored, false otherwise + * @throws IOException + */ + @Autowired + public FileMonitor( + @Qualifier("watchedFoldersDir") String rootDirectory, + @Qualifier("directoryFilter") Predicate pathFilter) + throws IOException { + this.newlyDiscoveredFiles = new HashSet<>(); + this.path2KeyMapping = new HashMap<>(); + this.stagingFiles = new HashSet<>(); + this.pathFilter = pathFilter; + this.readyForProcessingFiles = ConcurrentHashMap.newKeySet(); + this.watchService = FileSystems.getDefault().newWatchService(); + + Path path = Path.of(rootDirectory); + recursivelyRegisterEntry(path); + + logger.info("Created a new file tracker for directory: {}", rootDirectory); + } + + private boolean shouldNotProcess(Path path) { + return !pathFilter.test(path); + } + + private void recursivelyRegisterEntry(Path dir) throws IOException { + WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + path2KeyMapping.put(dir, key); + logger.info("Registered directory: {}", dir); + + try (Stream directoryVisitor = Files.walk(dir, 1)) { + final Iterator iterator = directoryVisitor.iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + if (path.equals(dir) || shouldNotProcess(path)) continue; + + if (Files.isDirectory(path)) { + recursivelyRegisterEntry(path); + } else if (Files.isRegularFile(path)) { + handleFileCreation(path); + } + } + } + } + + @Scheduled(fixedRate = 5000) + public void trackFiles() { + /* + All files observed changes in the last iteration will be considered as staging files. + If those files are not modified in current iteration, they will be considered as ready for processing. + */ + stagingFiles = new HashSet<>(newlyDiscoveredFiles); + readyForProcessingFiles.clear(); + WatchKey key; + while ((key = watchService.poll()) != null) { + final Path watchingDir = (Path) key.watchable(); + key.pollEvents() + .forEach( + (evt) -> { + final Path path = (Path) evt.context(); + final WatchEvent.Kind kind = evt.kind(); + if (shouldNotProcess(path)) return; + + try { + if (Files.isDirectory(path)) { + if (kind == ENTRY_CREATE) { + handleDirectoryCreation(path); + } + /* + we don't need to handle directory deletion or modification + - directory deletion will be handled by key.reset() + - directory modification indicates a new file creation or deletion, which is handled by below + */ + } + Path relativePathFromRoot = watchingDir.resolve(path); + if (kind == ENTRY_CREATE) { + handleFileCreation(relativePathFromRoot); + } else if (kind == ENTRY_DELETE) { + handleFileRemoval(relativePathFromRoot); + } else if (kind == ENTRY_MODIFY) { + handleFileModification(relativePathFromRoot); + } + } catch (Exception e) { + logger.error("Error while processing file: {}", path, e); + } + }); + + boolean isKeyValid = key.reset(); + if (!isKeyValid) { // key is invalid when the directory itself is no longer exists + path2KeyMapping.remove((Path) key.watchable()); + if (path2KeyMapping.isEmpty()) { + logger.warn( + "FileMonitor is not monitoring any directory, no even the root directory."); + } + } + } + readyForProcessingFiles.addAll(stagingFiles); + } + + private void handleDirectoryCreation(Path dir) throws IOException { + WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + path2KeyMapping.put(dir, key); + } + + private void handleFileRemoval(Path path) { + newlyDiscoveredFiles.remove(path); + stagingFiles.remove(path); + } + + private void handleFileCreation(Path path) { + newlyDiscoveredFiles.add(path); + stagingFiles.remove(path); + } + + private void handleFileModification(Path path) { + // the logic is the same + handleFileCreation(path); + } + + /** + * Check if the file is ready for processing. + * + *

A file is ready for processing if it is not being modified for 5000ms. + * + * @param path the path of the file + * @return true if the file is ready for processing, false otherwise + */ + public boolean isFileReadyForProcessing(Path path) { + return readyForProcessingFiles.contains(path); + } +} From 65b9544942d7792b845d8722fe7d2560162dcba6 Mon Sep 17 00:00:00 2001 From: Danny Lau Date: Wed, 29 May 2024 23:01:53 +0800 Subject: [PATCH 2/2] #1214 Fix unable to create FileMonitor if the root directory does not exist --- .../software/SPDF/utils/FileMonitor.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java index feff9c16..c11352ef 100644 --- a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java +++ b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java @@ -24,13 +24,12 @@ public class FileMonitor { private final ConcurrentHashMap.KeySetView readyForProcessingFiles; private final WatchService watchService; private final Predicate pathFilter; + private final Path rootDir; private Set stagingFiles; /** * @param rootDirectory the root directory to monitor - * @param pathFilter the filter to apply to the paths, return true if the path should be - * monitored, false otherwise - * @throws IOException + * @param pathFilter the filter to apply to the paths, return true if the path should be monitored, false otherwise */ @Autowired public FileMonitor( @@ -43,11 +42,7 @@ public class FileMonitor { this.pathFilter = pathFilter; this.readyForProcessingFiles = ConcurrentHashMap.newKeySet(); this.watchService = FileSystems.getDefault().newWatchService(); - - Path path = Path.of(rootDirectory); - recursivelyRegisterEntry(path); - - logger.info("Created a new file tracker for directory: {}", rootDirectory); + this.rootDir = Path.of(rootDirectory); } private boolean shouldNotProcess(Path path) { @@ -82,6 +77,20 @@ public class FileMonitor { */ stagingFiles = new HashSet<>(newlyDiscoveredFiles); readyForProcessingFiles.clear(); + + if (path2KeyMapping.isEmpty()) { + logger.warn( + "not monitoring any directory, even the root directory itself: {}", rootDir); + if (Files.exists( + rootDir)) { // if the root directory exists, re-register the root directory + try { + recursivelyRegisterEntry(rootDir); + } catch (IOException e) { + logger.error("unable to register monitoring", e); + } + } + } + WatchKey key; while ((key = watchService.poll()) != null) { final Path watchingDir = (Path) key.watchable(); @@ -119,10 +128,6 @@ public class FileMonitor { boolean isKeyValid = key.reset(); if (!isKeyValid) { // key is invalid when the directory itself is no longer exists path2KeyMapping.remove((Path) key.watchable()); - if (path2KeyMapping.isEmpty()) { - logger.warn( - "FileMonitor is not monitoring any directory, no even the root directory."); - } } } readyForProcessingFiles.addAll(stagingFiles);