Merge pull request #1282 from kkdlau/bugfix/1214-grab-zero-byte-pdf
#1214 Only take pdf that are good for processing
This commit is contained in:
commit
5d6e23d4b7
3 changed files with 207 additions and 5 deletions
|
@ -2,8 +2,10 @@ package stirling.software.SPDF.config;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||||
|
@ -108,4 +110,28 @@ public class AppConfig {
|
||||||
public boolean missingActivSecurity() {
|
public boolean missingActivSecurity() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean(name = "watchedFoldersDir")
|
||||||
|
public String watchedFoldersDir() {
|
||||||
|
return "./pipeline/watchedFolders/";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "finishedFoldersDir")
|
||||||
|
public String finishedFoldersDir() {
|
||||||
|
return "./pipeline/finishedFolders/";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean(name = "directoryFilter")
|
||||||
|
public Predicate<Path> processPDFOnlyFilter() {
|
||||||
|
return path -> {
|
||||||
|
if (Files.isDirectory(path)) {
|
||||||
|
return !path.toString()
|
||||||
|
.contains(
|
||||||
|
"processing");
|
||||||
|
} else {
|
||||||
|
String fileName = path.getFileName().toString();
|
||||||
|
return fileName.endsWith(".pdf");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import java.util.stream.Stream;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.core.io.ByteArrayResource;
|
import org.springframework.core.io.ByteArrayResource;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.Resource;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
@ -28,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import stirling.software.SPDF.model.PipelineConfig;
|
import stirling.software.SPDF.model.PipelineConfig;
|
||||||
import stirling.software.SPDF.model.PipelineOperation;
|
import stirling.software.SPDF.model.PipelineOperation;
|
||||||
|
import stirling.software.SPDF.utils.FileMonitor;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class PipelineDirectoryProcessor {
|
public class PipelineDirectoryProcessor {
|
||||||
|
@ -35,11 +37,18 @@ public class PipelineDirectoryProcessor {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class);
|
private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class);
|
||||||
@Autowired private ObjectMapper objectMapper;
|
@Autowired private ObjectMapper objectMapper;
|
||||||
@Autowired private ApiDocService apiDocService;
|
@Autowired private ApiDocService apiDocService;
|
||||||
|
|
||||||
final String watchedFoldersDir = "./pipeline/watchedFolders/";
|
|
||||||
final String finishedFoldersDir = "./pipeline/finishedFolders/";
|
|
||||||
|
|
||||||
@Autowired PipelineProcessor processor;
|
@Autowired PipelineProcessor processor;
|
||||||
|
@Autowired FileMonitor fileMonitor;
|
||||||
|
|
||||||
|
final String watchedFoldersDir;
|
||||||
|
final String finishedFoldersDir;
|
||||||
|
|
||||||
|
public PipelineDirectoryProcessor(
|
||||||
|
@Qualifier("watchedFoldersDir") String watchedFoldersDir,
|
||||||
|
@Qualifier("finishedFoldersDir") String finishedFoldersDir) {
|
||||||
|
this.watchedFoldersDir = watchedFoldersDir;
|
||||||
|
this.finishedFoldersDir = finishedFoldersDir;
|
||||||
|
}
|
||||||
|
|
||||||
@Scheduled(fixedRate = 60000)
|
@Scheduled(fixedRate = 60000)
|
||||||
public void scanFolders() {
|
public void scanFolders() {
|
||||||
|
@ -130,7 +139,7 @@ public class PipelineDirectoryProcessor {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (Stream<Path> paths = Files.list(dir)) {
|
try (Stream<Path> paths = Files.list(dir)) {
|
||||||
if ("automated".equals(operation.getParameters().get("fileInput"))) {
|
if ("automated".equals(operation.getParameters().get("fileInput"))) {
|
||||||
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile))
|
return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path))
|
||||||
.map(Path::toFile)
|
.map(Path::toFile)
|
||||||
.toArray(File[]::new);
|
.toArray(File[]::new);
|
||||||
} else {
|
} else {
|
||||||
|
|
167
src/main/java/stirling/software/SPDF/utils/FileMonitor.java
Normal file
167
src/main/java/stirling/software/SPDF/utils/FileMonitor.java
Normal file
|
@ -0,0 +1,167 @@
|
||||||
|
package stirling.software.SPDF.utils;
|
||||||
|
|
||||||
|
import static java.nio.file.StandardWatchEventKinds.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.*;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class FileMonitor {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class);
|
||||||
|
private final Map<Path, WatchKey> path2KeyMapping;
|
||||||
|
private final Set<Path> newlyDiscoveredFiles;
|
||||||
|
private final ConcurrentHashMap.KeySetView<Path, Boolean> readyForProcessingFiles;
|
||||||
|
private final WatchService watchService;
|
||||||
|
private final Predicate<Path> pathFilter;
|
||||||
|
private final Path rootDir;
|
||||||
|
private Set<Path> stagingFiles;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param rootDirectory the root directory to monitor
|
||||||
|
* @param pathFilter the filter to apply to the paths, return true if the path should be monitored, false otherwise
|
||||||
|
*/
|
||||||
|
@Autowired
|
||||||
|
public FileMonitor(
|
||||||
|
@Qualifier("watchedFoldersDir") String rootDirectory,
|
||||||
|
@Qualifier("directoryFilter") Predicate<Path> pathFilter)
|
||||||
|
throws IOException {
|
||||||
|
this.newlyDiscoveredFiles = new HashSet<>();
|
||||||
|
this.path2KeyMapping = new HashMap<>();
|
||||||
|
this.stagingFiles = new HashSet<>();
|
||||||
|
this.pathFilter = pathFilter;
|
||||||
|
this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
|
||||||
|
this.watchService = FileSystems.getDefault().newWatchService();
|
||||||
|
this.rootDir = Path.of(rootDirectory);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldNotProcess(Path path) {
|
||||||
|
return !pathFilter.test(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recursivelyRegisterEntry(Path dir) throws IOException {
|
||||||
|
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
||||||
|
path2KeyMapping.put(dir, key);
|
||||||
|
logger.info("Registered directory: {}", dir);
|
||||||
|
|
||||||
|
try (Stream<Path> directoryVisitor = Files.walk(dir, 1)) {
|
||||||
|
final Iterator<Path> iterator = directoryVisitor.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
Path path = iterator.next();
|
||||||
|
if (path.equals(dir) || shouldNotProcess(path)) continue;
|
||||||
|
|
||||||
|
if (Files.isDirectory(path)) {
|
||||||
|
recursivelyRegisterEntry(path);
|
||||||
|
} else if (Files.isRegularFile(path)) {
|
||||||
|
handleFileCreation(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Scheduled(fixedRate = 5000)
|
||||||
|
public void trackFiles() {
|
||||||
|
/*
|
||||||
|
All files observed changes in the last iteration will be considered as staging files.
|
||||||
|
If those files are not modified in current iteration, they will be considered as ready for processing.
|
||||||
|
*/
|
||||||
|
stagingFiles = new HashSet<>(newlyDiscoveredFiles);
|
||||||
|
readyForProcessingFiles.clear();
|
||||||
|
|
||||||
|
if (path2KeyMapping.isEmpty()) {
|
||||||
|
logger.warn(
|
||||||
|
"not monitoring any directory, even the root directory itself: {}", rootDir);
|
||||||
|
if (Files.exists(
|
||||||
|
rootDir)) { // if the root directory exists, re-register the root directory
|
||||||
|
try {
|
||||||
|
recursivelyRegisterEntry(rootDir);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("unable to register monitoring", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WatchKey key;
|
||||||
|
while ((key = watchService.poll()) != null) {
|
||||||
|
final Path watchingDir = (Path) key.watchable();
|
||||||
|
key.pollEvents()
|
||||||
|
.forEach(
|
||||||
|
(evt) -> {
|
||||||
|
final Path path = (Path) evt.context();
|
||||||
|
final WatchEvent.Kind<?> kind = evt.kind();
|
||||||
|
if (shouldNotProcess(path)) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (Files.isDirectory(path)) {
|
||||||
|
if (kind == ENTRY_CREATE) {
|
||||||
|
handleDirectoryCreation(path);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
we don't need to handle directory deletion or modification
|
||||||
|
- directory deletion will be handled by key.reset()
|
||||||
|
- directory modification indicates a new file creation or deletion, which is handled by below
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
Path relativePathFromRoot = watchingDir.resolve(path);
|
||||||
|
if (kind == ENTRY_CREATE) {
|
||||||
|
handleFileCreation(relativePathFromRoot);
|
||||||
|
} else if (kind == ENTRY_DELETE) {
|
||||||
|
handleFileRemoval(relativePathFromRoot);
|
||||||
|
} else if (kind == ENTRY_MODIFY) {
|
||||||
|
handleFileModification(relativePathFromRoot);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error while processing file: {}", path, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
boolean isKeyValid = key.reset();
|
||||||
|
if (!isKeyValid) { // key is invalid when the directory itself is no longer exists
|
||||||
|
path2KeyMapping.remove((Path) key.watchable());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
readyForProcessingFiles.addAll(stagingFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleDirectoryCreation(Path dir) throws IOException {
|
||||||
|
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
|
||||||
|
path2KeyMapping.put(dir, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleFileRemoval(Path path) {
|
||||||
|
newlyDiscoveredFiles.remove(path);
|
||||||
|
stagingFiles.remove(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleFileCreation(Path path) {
|
||||||
|
newlyDiscoveredFiles.add(path);
|
||||||
|
stagingFiles.remove(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleFileModification(Path path) {
|
||||||
|
// the logic is the same
|
||||||
|
handleFileCreation(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the file is ready for processing.
|
||||||
|
*
|
||||||
|
* <p>A file is ready for processing if it is not being modified for 5000ms.
|
||||||
|
*
|
||||||
|
* @param path the path of the file
|
||||||
|
* @return true if the file is ready for processing, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean isFileReadyForProcessing(Path path) {
|
||||||
|
return readyForProcessingFiles.contains(path);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue