From 3f4e6c7ce603fadd4f495228926e53e77795b5ee Mon Sep 17 00:00:00 2001 From: Adam Collins Date: Mon, 20 Jan 2025 13:58:00 +1000 Subject: [PATCH] #256 Add sandbox functionality --- build.gradle | 4 +- grails-app/conf/application.yml | 7 + .../au/org/ala/spatial/LayerController.groovy | 2 +- .../org/ala/spatial/SandboxController.groovy | 225 ++++++ .../au/org/ala/spatial/SandboxService.groovy | 683 ++++++++++++++++++ .../org/ala/spatial/TaskQueueService.groovy | 1 + .../au/org/ala/spatial/TasksService.groovy | 1 + .../au/org/ala/spatial/SpatialConfig.groovy | 7 +- .../org/ala/spatial/dto/SandboxIngress.groovy | 17 + .../ala/spatial/intersect/SimpleRegion.groovy | 6 +- .../ala/spatial/process/GeneratePoints.groovy | 90 +-- .../ala/spatial/process/SlaveProcess.groovy | 2 + 12 files changed, 972 insertions(+), 73 deletions(-) create mode 100644 grails-app/controllers/au/org/ala/spatial/SandboxController.groovy create mode 100644 grails-app/services/au/org/ala/spatial/SandboxService.groovy create mode 100644 src/main/groovy/au/org/ala/spatial/dto/SandboxIngress.groovy diff --git a/build.gradle b/build.gradle index 8c80330..47a8d5d 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ apply plugin: "org.grails.grails-gsp" apply plugin: "com.bertramlabs.asset-pipeline" -def alaSecurityLibsVersion='6.2.0' +def alaSecurityLibsVersion='6.3.0' def geotoolsVersion='27.2' war { @@ -212,7 +212,7 @@ dependencies { // developmentOnly 'io.methvin:directory-watcher:0.15.0' - + implementation 'org.gbif:dwc-api:1.47' } configurations { diff --git a/grails-app/conf/application.yml b/grails-app/conf/application.yml index 11d0d2c..f8c49d6 100644 --- a/grails-app/conf/application.yml +++ b/grails-app/conf/application.yml @@ -467,3 +467,10 @@ openapi: version: '@info.app.version@' cachetimeoutms: 4000 +# Internal sandbox service for processing uploaded CSV files +sandboxEnabled: true +sandboxSolrUrl: http://localhost:8983/solr +sandboxSolrCollection: sandbox +sandboxThreadCount: 2 +pipelinesCmd: "java -Dspark.local.dir=/data/spatial-data/sandbox/tmp -Djava.io.tmpdir=/data/spatial-data/sandbox/tmp -Dlog4j.configuration=file:/data/spatial-data/modelling/la-pipelines/log4j.properties -cp /data/spatial-data/modelling/la-pipelines/pipelines-2.19.0-SNAPSHOT-shaded.jar" +pipelinesConfig: "--config=/data/spatial-data/modelling/la-pipelines/la-pipelines.yaml" diff --git a/grails-app/controllers/au/org/ala/spatial/LayerController.groovy b/grails-app/controllers/au/org/ala/spatial/LayerController.groovy index afc6e6c..23446bf 100644 --- a/grails-app/controllers/au/org/ala/spatial/LayerController.groovy +++ b/grails-app/controllers/au/org/ala/spatial/LayerController.groovy @@ -96,7 +96,7 @@ class LayerController { def img(String id) { if (layerService.getLayerByName(id)) { File f = new File(spatialConfig.data.dir + '/public/thumbnail/' + id + '.jpg') - render(file: f, fileName: "${id}.jpg") + render(file: f, fileName: "${id}.jpg", contentType: "image/jpg") } else { response.sendError(404, "$id not found") } diff --git a/grails-app/controllers/au/org/ala/spatial/SandboxController.groovy b/grails-app/controllers/au/org/ala/spatial/SandboxController.groovy new file mode 100644 index 0000000..e475bff --- /dev/null +++ b/grails-app/controllers/au/org/ala/spatial/SandboxController.groovy @@ -0,0 +1,225 @@ +/* + * Copyright (C) 2016 Atlas of Living Australia + * All Rights Reserved. + * + * The contents of this file are subject to the Mozilla Public + * License Version 1.1 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS + * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or + * implied. See the License for the specific language governing + * rights and limitations under the License. + */ + +package au.org.ala.spatial + +import au.ala.org.ws.security.RequireApiKey +import au.org.ala.plugins.openapi.Path +import au.org.ala.spatial.dto.SandboxIngress +import au.org.ala.web.AuthService +import grails.converters.JSON +import io.swagger.v3.oas.annotations.Operation +import io.swagger.v3.oas.annotations.Parameter +import io.swagger.v3.oas.annotations.media.Content +import io.swagger.v3.oas.annotations.media.Schema +import io.swagger.v3.oas.annotations.parameters.RequestBody +import io.swagger.v3.oas.annotations.responses.ApiResponse +import io.swagger.v3.oas.annotations.security.SecurityRequirement +import org.springframework.web.multipart.MultipartFile + +import javax.ws.rs.Produces + +import static io.swagger.v3.oas.annotations.enums.ParameterIn.QUERY + +class SandboxController { + + SpatialObjectsService spatialObjectsService + + SpatialConfig spatialConfig + def dataSource + def sandboxService + AuthService authService + + @Operation( + method = "POST", + tags = "uploads", + operationId = "uploadCSV", + summary = "Upload a CSV or zipped CSV file containing occurrence points", + parameters = [ + @Parameter( + name = "name", + in = QUERY, + description = "datasetName", + schema = @Schema(implementation = String), + required = true + ) + ], + requestBody = @RequestBody( + description = "Uploaded CSV or zipped CSV file", + content = @Content( + mediaType = 'application/zip', + schema = @Schema( + type = "string", + format = "binary" + ) + ) + ), + responses = [ + @ApiResponse( + description = "Recognised header and a dataResourceUid", + responseCode = "200", + content = [ + @Content( + mediaType = "application/json" + ) + ] + ) + ], + security = [@SecurityRequirement(name = 'openIdConnect')] + ) + @Path("/sandbox/upload") + @Produces("application/json") + @RequireApiKey + def upload() { + if (!spatialConfig.sandboxEnabled) { + return [error: "Sandbox is disabled"] as JSON + } + + // Use linked hash map to maintain key ordering + Map retMap = new LinkedHashMap() + + // Parse the request + Map items = request.getFileMap() + + if (items.size() == 1) { + MultipartFile fileItem = items.values()[0] + + SandboxIngress info = sandboxService.upload(fileItem, (String) params.name, authService.getUserId()) + if (info) { + render info as JSON + return + } else { + retMap.put("error", "Failed to upload file") + } + } else { + retMap.put("error", items.size() + " files sent in request. A single zipped CSV file should be supplied.") + } + + render retMap as JSON + } + + // delete service + @Operation( + method = "DELETE", + tags = "uploads", + operationId = "deleteCSV", + summary = "Delete a CSV or zipped CSV file containing occurrence points", + parameters = [ + @Parameter( + name = "id", + in = QUERY, + description = "datasetId", + schema = @Schema(implementation = String), + required = true + ) + ], + responses = [ + @ApiResponse( + description = "Delete the file", + responseCode = "200", + content = [ + @Content( + mediaType = "application/json" + ) + ] + ) + ], + security = [@SecurityRequirement(name = 'openIdConnect')] + ) + @Path("/sandbox/delete") + @Produces("application/json") + @RequireApiKey + def delete() { + if (!spatialConfig.sandboxEnabled) { + return [error: "Sandbox is disabled"] as JSON + } + + // Use linked hash map to maintain key ordering + Map retMap = new LinkedHashMap() + + // Parse the request + String id = params.id + + if (id) { + boolean successful = sandboxService.delete(id, authService.getUserId(), authService.userInRole("ROLE_ADMIN")) + if (successful) { + retMap.put("message", "File deleted") + render retMap as JSON + return + } + } else { + retMap.put("error", "No file id supplied") + } + + render retMap as JSON, status: 500 + } + + // status service + @Operation( + method = "GET", + tags = "uploads", + operationId = "statusCSV", + summary = "Get the status of a CSV or zipped CSV file containing occurrence points", + parameters = [ + @Parameter( + name = "id", + in = QUERY, + description = "datasetId", + schema = @Schema(implementation = String), + required = true + ) + ], + responses = [ + @ApiResponse( + description = "Status of the file", + responseCode = "200", + content = [ + @Content( + mediaType = "application/json" + ) + ] + ) + ], + security = [@SecurityRequirement(name = 'openIdConnect')] + ) + @Path("/sandbox/status") + @Produces("application/json") + def status() { + if (!spatialConfig.sandboxEnabled) { + return [error: "Sandbox is disabled"] as JSON + } + + // Use linked hash map to maintain key ordering + Map retMap = new LinkedHashMap() + + // Parse the request + String id = params.id + + if (id) { + SandboxIngress info = sandboxService.getStatus(id) + if (info) { + render info as JSON + return + } else { + retMap.put("error", "Failed to get status") + } + } else { + retMap.put("error", "No file id supplied") + } + + render retMap as JSON, status: 500 + } +} + diff --git a/grails-app/services/au/org/ala/spatial/SandboxService.groovy b/grails-app/services/au/org/ala/spatial/SandboxService.groovy new file mode 100644 index 0000000..f87f134 --- /dev/null +++ b/grails-app/services/au/org/ala/spatial/SandboxService.groovy @@ -0,0 +1,683 @@ +package au.org.ala.spatial + +import au.org.ala.spatial.dto.SandboxIngress +import au.org.ala.ws.service.WebService +import com.opencsv.CSVReader +import com.opencsv.CSVWriter +import org.apache.commons.io.FileUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang3.StringUtils +import org.apache.http.entity.ContentType +import org.gbif.dwc.terms.Term +import org.gbif.dwc.terms.TermFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.http.HttpMethod +import org.springframework.http.ResponseEntity +import org.springframework.web.client.RestTemplate +import org.springframework.web.multipart.MultipartFile + +import javax.annotation.PostConstruct +import java.nio.charset.Charset +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.SynchronousQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicInteger +import java.util.zip.ZipEntry +import java.util.zip.ZipFile +import java.util.zip.ZipOutputStream + +import static java.util.concurrent.TimeUnit.MILLISECONDS + +class SandboxService { + private static final Logger logger = LoggerFactory.getLogger(SandboxService.class) + + SpatialConfig spatialConfig + WebService webService + + // Treating sandbox queue separate from other processes, not sure if this is the best approach. + // However, GeneratePoints can run a sandbox import independently of this executor. + ThreadPoolExecutor executorService + + // Unique Id for each request + AtomicInteger requestId = new AtomicInteger(0) + + // the requestId of the most recent item in the queue + AtomicInteger queuePosition = new AtomicInteger(0) + + // this method is only valid for the life of the application + SandboxIngress getStatus(String dataResourceUid) { + if (!isValidUUID(dataResourceUid)) { + return null; + } + + // get the status of the data resource + StatusItem statusItem = queueItemStatus.get(dataResourceUid); + if (statusItem != null) { + // update the status when the item is still in the queue + if (statusItem.sandboxIngress.requestId > queuePosition.get()) { + statusItem.sandboxIngress.status = "queued"; + statusItem.sandboxIngress.message = "waiting for " + (statusItem.sandboxIngress.requestId - queuePosition.get()) + " items to finish"; + } + + return statusItem.sandboxIngress; + } + + return null; + } + + class StatusItem { + SandboxIngress sandboxIngress; + Runnable runnable; + + StatusItem(SandboxIngress sandboxIngress, Runnable runnable) { + this.sandboxIngress = sandboxIngress; + this.runnable = runnable; + } + } + + Map queueItemStatus = new ConcurrentHashMap<>() + + @PostConstruct + void init() { + // create the require directories + + File uploadDir = new File(spatialConfig.data.dir + "/sandbox/upload"); + if (!uploadDir.exists()) { + uploadDir.mkdirs(); + } + + File processingDir = new File(spatialConfig.data.dir + "/sandbox/upload"); + if (!processingDir.exists()) { + processingDir.mkdirs(); + } + + File tmpDir = new File(spatialConfig.data.dir + "/sandbox/tmp"); + if (!tmpDir.exists()) { + tmpDir.mkdirs(); + } + + // setup executor + executorService = new ThreadPoolExecutor(1, 100, 0, MILLISECONDS, + new SynchronousQueue<>(), + new ThreadPoolExecutor.AbortPolicy()); + } + + boolean isValidUUID(String uuid) { + // validate that uuid is a correctly formed UUID + try { + return UUID.fromString(uuid).toString().equals(uuid); + } catch (Exception ignored) { + return false; + } + } + + SandboxIngress upload(MultipartFile file, String datasetName, String userId) throws IOException { + // get uuid + String uuid = UUID.randomUUID().toString(); + + SandboxIngress sandboxIngress = new SandboxIngress(); + sandboxIngress.setId(uuid); + sandboxIngress.setUserId(userId); + sandboxIngress.setDataResourceUid(UUID.randomUUID().toString()); + sandboxIngress.setDescription(datasetName); + sandboxIngress.setStatusUrl(spatialConfig.spatialService.url + "/sandbox/status/" + sandboxIngress.getDataResourceUid()); + sandboxIngress.setRequestId(requestId.incrementAndGet()) + + if (file.getOriginalFilename().toLowerCase().endsWith(".csv")) { + importCsv(file, sandboxIngress); + } else if (file.getOriginalFilename().toLowerCase().endsWith(".zip")) { + importZip(file, sandboxIngress); + } else { + throw new Exception("Unsupported file type: " + file.getOriginalFilename()) + } + + Runnable task = new Runnable() { + @Override + void run() { + // update queue position, if necessary + if (queuePosition.get() < sandboxIngress.getRequestId()) { + queuePosition.set(sandboxIngress.getRequestId()); + } + + ingress(sandboxIngress); + } + } + + queueItemStatus.put(sandboxIngress.getDataResourceUid(), new StatusItem(sandboxIngress, task)); + + executorService.execute(task); + + return sandboxIngress; + } + + /** + * import a CSV immediately, returning the dataResourceUid when done + * + * @param csv a CSV with a header and at least decimalLatitude and decimalLongitude columns + * @param datasetName + * @param userId + * @return a SandboxIngress with dataResourceUid, status (for errors), etc + * @throws IOException + */ + SandboxIngress importPoints(String csv, String datasetName, String userId) throws IOException { + // get uuid + String uuid = UUID.randomUUID().toString(); + + SandboxIngress sandboxIngress = new SandboxIngress(); + sandboxIngress.setId(uuid); + sandboxIngress.setUserId(userId); + sandboxIngress.setDataResourceUid(UUID.randomUUID().toString()); + sandboxIngress.setDescription(datasetName); + sandboxIngress.setStatusUrl(spatialConfig.spatialService.url + "/sandbox/status/" + sandboxIngress.getDataResourceUid()); + + // put the csv into a MultipartFile object + MultipartFile file = new MultipartFile() { + @Override + String getName() { + return "points.csv"; + } + + @Override + String getOriginalFilename() { + return "points.csv"; + } + + @Override + String getContentType() { + return "text/csv"; + } + + @Override + boolean isEmpty() { + return false; + } + + @Override + long getSize() { + return csv.length(); + } + + @Override + byte[] getBytes() { + return csv.getBytes(Charset.defaultCharset()); + } + + @Override + InputStream getInputStream() { + return new ByteArrayInputStream(csv.getBytes(Charset.defaultCharset())); + } + + @Override + void transferTo(File dest) throws IOException { + FileUtils.write(dest, csv, Charset.defaultCharset()); + } + }; + + importCsv(file, sandboxIngress); + + ingress(sandboxIngress); + + return sandboxIngress; + } + + void importZip(MultipartFile file, SandboxIngress sandboxIngress) { + // upload and unzip file + File thisDir = new File(spatialConfig.data.dir + "/sandbox/upload/" + sandboxIngress.getId()); + try { + FileUtils.forceMkdir(thisDir); + File zipFile = new File(thisDir, "archive.zip"); + file.transferTo(zipFile); + + // unzip into thisDir, the files "meta.xml" and "occurrences.txt" + ZipFile zip = new ZipFile(zipFile); + + // check if this is a DwCA by finding "meta.xml" in the zip file + Enumeration entries = zip.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + if (entry.getName().toLowerCase().endsWith("meta.xml")) { + sandboxIngress.setIsDwCA(true); + break; + } + } + + // treat as a csv or tsv file when not a DwCA + if (!sandboxIngress.getIsDwCA()) { + entries = zip.entries(); + + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + File entryFile = new File(thisDir, entry.getName()); + + if (!entry.isDirectory()) { + InputStream input = zip.getInputStream(entry); + OutputStream out = new FileOutputStream(entryFile); + IOUtils.copy(input, out); + IOUtils.closeQuietly(input); + IOUtils.closeQuietly(out); + } + } + + sandboxIngress.setIsDwCA(false); + + String[] header = null; + + // look for a single csv or tsv file + File[] files = thisDir.listFiles(); + for (File f : files) { + if (f.getName().toLowerCase().endsWith(".csv")) { + // move to occurrences.txt + header = convertCsvToDwCA(f, thisDir, sandboxIngress.getUserId(), sandboxIngress.getDescription()); + } else if (f.getName().toLowerCase().endsWith(".tsv")) { + BufferedReader br = new BufferedReader(new FileReader(f)); + header = br.readLine().split("\n"); + } + } + if (header != null) { + sandboxIngress.setHeaders(interpretHeader(header)); + } else { + sandboxIngress = null; + + logger.error("Error interpreting header: " + thisDir.getAbsolutePath()); + } + } + } catch (IOException e) { + sandboxIngress = null; + logger.error("Error importing ZIP file: " + thisDir.getAbsolutePath(), e); + } + + // delete directory on error + if (sandboxIngress == null) { + try { + FileUtils.deleteDirectory(thisDir); + } catch (IOException e) { + logger.error("Error deleting directory: " + thisDir.getAbsolutePath(), e); + } + } + } + + void importCsv(MultipartFile file, SandboxIngress si) throws IOException { + String[] header = null; + try { + File thisDir = new File(spatialConfig.data.dir + "/sandbox/upload/" + si.dataResourceUid); + FileUtils.forceMkdir(thisDir); + File csvFile = new File(thisDir, "occurrences.csv"); + file.transferTo(csvFile); + + // convert csv to tsv + si.setHeaders(convertCsvToDwCA(csvFile, thisDir, si.getUserId(), si.getDescription())); + } catch (IOException e) { + logger.error("Error importing CSV file", e); + } + + // update sandboxIngress + si.setIsDwCA(false); + } + + String[] convertCsvToDwCA(File csvFile, File thisDir, String userID, String datasetName) throws IOException { + CSVReader reader = null; + CSVWriter writer = null; + + String[] header = null; + try { + // convert csv to tsv + File tsvFile = new File(thisDir, "occurrence.tsv"); + reader = new CSVReader(new FileReader(csvFile)); + writer = new CSVWriter(new FileWriter(tsvFile), '\t' as char); + + String[] nextLine; + int occurrenceIDIndex = -1; + int occurrenceIDIndexNew = -1; + int userIDIndex = -1; + int datasetNameIndex = -1; + int row = 0; + while ((nextLine = reader.readNext()) != null) { + // First row is the header + if (row == 0) { + header = interpretHeader(nextLine); + + // Append occurrenceID to the header, if absent + String occurrenceIDQualified = TermFactory.instance().findTerm("occurrenceID").qualifiedName(); + occurrenceIDIndex = Arrays.asList(header).indexOf(occurrenceIDQualified); + if (occurrenceIDIndex < 0) { + String[] newHeader = new String[header.length + 1]; + System.arraycopy(header, 0, newHeader, 0, header.length); + newHeader[header.length] = occurrenceIDQualified; + header = newHeader; + } + + // Append userID to the header, if absent + String userIDQualified = TermFactory.instance().findTerm("userId").qualifiedName(); + userIDIndex = Arrays.asList(header).indexOf(userIDQualified); + if (userIDIndex < 0) { + String[] newHeader = new String[header.length + 1]; + System.arraycopy(header, 0, newHeader, 0, header.length); + newHeader[header.length] = userIDQualified; + header = newHeader; + } + + // Append datasetName to the header, if absent + String datasetNameQualified = TermFactory.instance().findTerm("datasetName").qualifiedName(); + datasetNameIndex = Arrays.asList(header).indexOf(datasetNameQualified); + if (datasetNameIndex < 0) { + String[] newHeader = new String[header.length + 1]; + System.arraycopy(header, 0, newHeader, 0, header.length); + newHeader[header.length] = datasetNameQualified; + header = newHeader; + } + } else { + // Append row number as the unique occurrenceID + if (occurrenceIDIndex < 0) { + String[] newLine = new String[nextLine.length + 1]; + occurrenceIDIndexNew = nextLine.length; + + System.arraycopy(nextLine, 0, newLine, 0, nextLine.length); + newLine[nextLine.length] = Integer.toString(row); + nextLine = newLine; + } else { + // replace occurrenceID with the row number to prevent errors + nextLine[occurrenceIDIndex] = Integer.toString(row); + + occurrenceIDIndexNew = occurrenceIDIndex; + } + + // Append ALA userID as the userID + if (userIDIndex < 0) { + String[] newLine = new String[nextLine.length + 1]; + System.arraycopy(nextLine, 0, newLine, 0, nextLine.length); + newLine[nextLine.length] = userID; + nextLine = newLine; + } else { + // replace userID with ALA userID + nextLine[userIDIndex] = userID; + } + + // Append datasetName to the row + if (datasetNameIndex < 0) { + String[] newLine = new String[nextLine.length + 1]; + System.arraycopy(nextLine, 0, newLine, 0, nextLine.length); + newLine[nextLine.length] = datasetName; + nextLine = newLine; + } else { + // replace datasetName with the datasetName + nextLine[datasetNameIndex] = datasetName; + } + + writer.writeNext(nextLine); + } + + row ++; + } + + writer.flush(); + + // create meta.xml + StringBuilder sb = new StringBuilder(); + sb.append("\n" + + "\n" + + " \n" + + " \n" + + " occurrence.tsv\n" + + " \n" + + " \n"); + for (int i = 0; i < header.length; i++) { + sb.append(" \n"); + } + sb.append(" \n" + + ""); + FileUtils.write(new File(csvFile.getParent(), "meta.xml"), sb.toString(), "UTF-8"); + + // create the zip file + File zipFile = new File(thisDir, "archive.zip"); + FileOutputStream fos = new FileOutputStream(zipFile); + ZipOutputStream zipOut = new ZipOutputStream(fos); + + ZipEntry entry = new ZipEntry("occurrence.tsv"); + zipOut.putNextEntry(entry); + FileInputStream input = new FileInputStream(tsvFile); + IOUtils.copy(input, zipOut); + IOUtils.closeQuietly(input); + zipOut.closeEntry(); + + entry = new ZipEntry("meta.xml"); + zipOut.putNextEntry(entry); + input = new FileInputStream(new File(thisDir, "meta.xml")); + IOUtils.copy(input, zipOut); + IOUtils.closeQuietly(input); + zipOut.closeEntry(); + + zipOut.close(); + } catch (IOException e) { + logger.error("Error importing CSV file", e); + } + + if (reader != null) { + reader.close(); + } + + if (writer != null) { + writer.close(); + } + + return header; + } + + private String[] interpretHeader(String[] header) { + TermFactory factory = TermFactory.instance(); + + String[] matched = new String[header.length]; + for (int i = 0; i < header.length; i++) { + Term term = factory.findTerm(header[i]); + if (term != null) { + matched[i] = term.qualifiedName(); + } else { + matched[i] = header[i]; + } + } + return matched; + } + + String getUserId(String id) { + if (!isValidUUID(id)) { + return null; + } + + // get userId from SOLR + Map resp = webService.get(spatialConfig.sandboxSolrUrl + "/" + spatialConfig.sandboxSolrCollection + "/select?q=dataResourceUid%3A" + id + "&fl=userId&rows=1", null, ContentType.APPLICATION_JSON, false, false, null); + + if (resp?.resp != null && resp.resp.containsKey("response") && resp.resp.get("response") instanceof Map) { + Map response = (Map) resp.resp.get("response"); + if (response.containsKey("docs") && response.get("docs") instanceof List) { + List docs = (List) response.get("docs"); + if (docs.size() > 0) { + Map doc = (Map) docs.get(0); + if (doc.containsKey("userId")) { + return (String) doc.get("userId"); + } + } + } + } + + return null; + } + + /** + * Delete a sandbox data resource, from the /upload/ directory and SOLR. + * + * @param id upload UUID (dataResourceUid) + * @param userId user ID or null to skip user check + * @return + */ + boolean delete(String id, String userId, boolean isAdmin) { + if (!isValidUUID(id)) { + return false; + } + + // check that the user owns this data resource or is admin + String drUserId = getUserId(id); + if (!isAdmin || !drUserId.equals(userId)) { + return false; + } + + // delete the file uploaded + File thisDir = new File(spatialConfig.data.dir + "/sandbox/upload/" + id); + try { + if (thisDir.exists()) { + FileUtils.deleteDirectory(thisDir); + } + } catch (IOException e) { + logger.error("Error deleting directory: " + thisDir.getAbsolutePath(), e); + } + + // delete from SOLR + String json = '{"delete":{"query":"dataResourceUid:' + id + '"}}' + Map resp = webService.post(spatialConfig.sandboxSolrUrl + "/" + spatialConfig.sandboxSolrCollection + "/update?commit=true", json, null, ContentType.APPLICATION_JSON, false, false, null) + + return resp != null && resp.statusCode == 200 + } + + /** + * Load a DwCA into the pipelines + * + * @param sandboxIngress + */ + def ingress(SandboxIngress sandboxIngress) { + String datasetID = sandboxIngress.getDataResourceUid(); + + String [] dwcaToSandboxOpts = new String [] { + "au.org.ala.pipelines.java.DwcaToSolrPipeline", + "--datasetId=" + datasetID, + "--targetPath=" + spatialConfig.data.dir + "/sandbox/processed", + "--inputPath=" + spatialConfig.data.dir + "/sandbox/upload/" + datasetID, + "--solrCollection=" + spatialConfig.sandboxSolrCollection, + "--solrHost=" + spatialConfig.sandboxSolrUrl, + "--includeSampling=true", + "--config=" + spatialConfig.data.dir + "/la-pipelines/config/la-pipelines.yaml", + "--maxThreadCount=" + spatialConfig.sandboxThreadCount + } + sandboxIngress.status = "running" + sandboxIngress.message = "started" + int result = pipelinesExecute(dwcaToSandboxOpts, new File(spatialConfig.data.dir + "/sandbox/processed/" + datasetID + "/logs/DwcaToSolrPipeline.log"), sandboxIngress) + + if (result != 0) { + sandboxIngress.status = "error" + sandboxIngress.message = "DwcaToSolrPipeline failed" + return + } + + // delete processing files + File processedDir = new File(spatialConfig.data.dir + "/sandbox/processed/" + datasetID); + try { + if (processedDir.exists()) { + FileUtils.deleteDirectory(processedDir); + } + } catch (IOException e) { + logger.error("Error deleting directory: " + processedDir.getAbsolutePath(), e); + } + + // double check SOLR + try { + long sleepMs = 500; // 0.5s + Thread.sleep(sleepMs); + int maxWaitRetry = 100; // 100x 0.5s = 50s max wait in this loop + int retry = 0; + while (retry < maxWaitRetry) { + ResponseEntity response = new RestTemplate().exchange( + spatialConfig.sandboxSolrUrl + "/" + spatialConfig.sandboxSolrCollection + "/select?q=dataResourceUid:" + sandboxIngress.getDataResourceUid(), + HttpMethod.GET, + null, + Map.class); + + if (response.getStatusCode().is2xxSuccessful() && + ((Integer) ((Map) response.getBody().get("response")).get("numFound")) > 0) { + int solrCount = ((Integer) ((Map) response.getBody().get("response")).get("numFound")); + logger.info("SOLR import successful: " + solrCount + " records"); + + sandboxIngress.status = "finished"; + sandboxIngress.message = "SOLR import successful: " + solrCount + " records (subject to in progress indexing)" + + return solrCount; + } + Thread.sleep(sleepMs); + retry++; + } + } catch (Exception e) { + logger.error("SOLR request failed: " + e.getMessage()); + } + + sandboxIngress.status = "error"; + sandboxIngress.message = "SOLR import failed (or timed out)" + + return 0; + } + + int pipelinesExecute(String[] opts, File logFile, SandboxIngress sandboxIngress) { + String [] prefix = spatialConfig.pipelinesCmd.split(" "); + String [] cmd = new String[prefix.length + opts.length + 1]; + System.arraycopy(prefix, 0, cmd, 0, prefix.length); + System.arraycopy(opts, 0, cmd, prefix.length, opts.length); + cmd[cmd.length - 1] = spatialConfig.pipelinesConfig; + + try { + logger.info("Executing pipeline: " + StringUtils.join(cmd, " ")); + ProcessBuilder builder = new ProcessBuilder(cmd); + builder.environment().putAll(System.getenv()); + builder.redirectErrorStream(true); + + Process proc = builder.start(); + + logSandboxStream(proc.getInputStream(), logFile, sandboxIngress); + + return proc.waitFor(); + } catch (Exception e) { + logger.error("Error executing pipeline: " + Arrays.toString(cmd), e); + throw new RuntimeException(e); + } + + return 1; // error + } + + private static void logSandboxStream(InputStream stream, File logFile, SandboxIngress sandboxIngress) { + new Thread(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { + String line; + + if (logFile != null) { + logFile.getParentFile().mkdirs(); + + try (FileWriter writer = new FileWriter(logFile)) { + while ((line = reader.readLine()) != null) { + writer.write(line + "\n"); + + if (sandboxIngress != null && line.contains("PROGRESS")) { + sandboxIngress.message = line.substring(line.indexOf("PROGRESS") + 10); + } + } + + writer.flush(); + } catch (IOException e) { + logger.error("Error writing log file", e); + } + } else { + while ((line = reader.readLine()) != null) { + if (sandboxIngress != null && line.contains("PROGRESS")) { + sandboxIngress.message = line.substring(line.indexOf("PROGRESS")); + } + } + } + } catch (IOException e) { + logger.error("Error reading stream", e); + } + }).start(); + } + + // wait a bit, but not too long, to allow stuff to finish, maybe + void smallSleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + logger.error("Error waiting", e); + } + } +} diff --git a/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy b/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy index 75aeea2..1deb0c4 100644 --- a/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy +++ b/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy @@ -178,6 +178,7 @@ class TaskQueueService { operator.tabulationGeneratorService = tasksService.tabulationGeneratorService operator.fileService = tasksService.fileService operator.webService = tasksService.webService + operator.sandboxService = tasksService.sandboxService //start operator.start() diff --git a/grails-app/services/au/org/ala/spatial/TasksService.groovy b/grails-app/services/au/org/ala/spatial/TasksService.groovy index 758a094..21a0e1e 100644 --- a/grails-app/services/au/org/ala/spatial/TasksService.groovy +++ b/grails-app/services/au/org/ala/spatial/TasksService.groovy @@ -49,6 +49,7 @@ class TasksService { TabulationGeneratorService tabulationGeneratorService FileService fileService WebService webService + SandboxService sandboxService PublishService publishService TaskQueueService taskQueueService diff --git a/src/main/groovy/au/org/ala/spatial/SpatialConfig.groovy b/src/main/groovy/au/org/ala/spatial/SpatialConfig.groovy index 5adb72d..7065fa7 100644 --- a/src/main/groovy/au/org/ala/spatial/SpatialConfig.groovy +++ b/src/main/groovy/au/org/ala/spatial/SpatialConfig.groovy @@ -217,5 +217,10 @@ class SpatialConfig { String biocacheUrl - + Boolean sandboxEnabled + String sandboxSolrUrl + String pipelinesCmd + String pipelinesConfig + String sandboxSolrCollection + Integer sandboxThreadCount } diff --git a/src/main/groovy/au/org/ala/spatial/dto/SandboxIngress.groovy b/src/main/groovy/au/org/ala/spatial/dto/SandboxIngress.groovy new file mode 100644 index 0000000..f928269 --- /dev/null +++ b/src/main/groovy/au/org/ala/spatial/dto/SandboxIngress.groovy @@ -0,0 +1,17 @@ +package au.org.ala.spatial.dto; + +import com.fasterxml.jackson.annotation.JsonInclude; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +class SandboxIngress { + String id; // id of the uploaded file that is one of DwCA, CSV, TSV + String description; // user provided description of the file + String [] headers; // when isDwCA==false this corresponds to DwCA meta.xml for occurrences.txt + String userId; // user id of the person who uploaded the file + Boolean isDwCA; // true if the file is a DwCA + String dataResourceUid; // dataResourceUid as it is loaded into SOLR + String status; // status of the import process + String message; // status message of the import process + String statusUrl; // url to check the status of the import process + Integer requestId; // id of the request +} diff --git a/src/main/groovy/au/org/ala/spatial/intersect/SimpleRegion.groovy b/src/main/groovy/au/org/ala/spatial/intersect/SimpleRegion.groovy index 22ccb71..8c13a67 100644 --- a/src/main/groovy/au/org/ala/spatial/intersect/SimpleRegion.groovy +++ b/src/main/groovy/au/org/ala/spatial/intersect/SimpleRegion.groovy @@ -1018,8 +1018,7 @@ class SimpleRegion implements Serializable { } } else { //sloped line endlat = dy2 - int kStep = Math.max(divy, 1) - for (double k = (y + 1) * divy + latitude1; k < endlat; k += kStep) { + for (double k = (y + 1) * divy + latitude1; k < endlat; k += (int)(divy)) { //move in yDirection to get x xcross = (k - intercept) / slope icross = (int) ((xcross - longitude1) / divx) @@ -1141,8 +1140,7 @@ class SimpleRegion implements Serializable { } } else { //sloped line endlat = dy2 - int kStep = Math.max(divy, 1) - for (double k = (y + 1) * divy + latitude1; k < endlat; k += kStep) { + for (double k = (y + 1) * divy + latitude1; k < endlat; k += (int)(divy)) { //move in yDirection to get x xcross = (k - intercept) / slope icross = (int) ((xcross - longitude1) / divx) diff --git a/src/main/groovy/au/org/ala/spatial/process/GeneratePoints.groovy b/src/main/groovy/au/org/ala/spatial/process/GeneratePoints.groovy index c15b2d4..92eaa4c 100644 --- a/src/main/groovy/au/org/ala/spatial/process/GeneratePoints.groovy +++ b/src/main/groovy/au/org/ala/spatial/process/GeneratePoints.groovy @@ -16,15 +16,12 @@ package au.org.ala.spatial.process import au.org.ala.spatial.dto.AreaInput -import au.org.ala.spatial.Util +import au.org.ala.spatial.dto.SandboxIngress import au.org.ala.spatial.intersect.SimpleRegion import au.org.ala.spatial.intersect.SimpleShapeFile import grails.converters.JSON import groovy.util.logging.Slf4j -import org.apache.commons.httpclient.NameValuePair -import org.grails.web.json.JSONObject -//@CompileStatic @Slf4j class GeneratePoints extends SlaveProcess { @@ -35,8 +32,6 @@ class GeneratePoints extends SlaveProcess { Double distance = getInput('distance').toString().toDouble() String userId = getInput('userId') - String sandboxBiocacheServiceUrl = getInput('sandboxBiocacheServiceUrl') - String sandboxHubUrl = getInput('sandboxHubUrl') double[] bbox = JSON.parse(area[0].bbox) as double[] @@ -44,7 +39,7 @@ class GeneratePoints extends SlaveProcess { SimpleRegion simpleArea = SimpleShapeFile.parseWKT(wkt) // dump the data to a file - taskWrapper.task.message = "Loading area ..." + taskLog("Loading area ...") List> points = [] for (double x = bbox[0]; x <= bbox[2]; x += distance) { @@ -54,74 +49,39 @@ class GeneratePoints extends SlaveProcess { } } } - taskWrapper.task.history.put(System.currentTimeMillis() as String, points.size() + " points have been created.") + taskLog(points.size() + " points have been created.") - uploadPoints(sandboxBiocacheServiceUrl, sandboxHubUrl, userId, points, area.name, distance) + uploadPoints(userId, points, area.name, distance) } - def uploadPoints(String sandboxBiocacheServiceUrl, String sandboxHubUrl, String userId, List> points, areaName, distance) { - //upload + def uploadPoints(String userId, List> points, areaName, distance) { + taskLog("Uploading points to sandbox ...") + + // build csv content StringBuilder sb = new StringBuilder() + sb.append("decimalLongitude,decimalLatitude") points.each { if (sb.size() > 0) sb.append("\n") sb.append(it[0]).append(",").append(it[1]) } def name = "Points in ${areaName} on ${distance} degree grid" - NameValuePair[] nameValuePairs = [ - new NameValuePair("csvData", sb.toString()), - new NameValuePair("headers", "decimalLongitude,decimalLatitude"), - new NameValuePair("datasetName", name), - new NameValuePair("separator", ","), - new NameValuePair("firstLineIsData", "false"), - new NameValuePair("customIndexedFields", ""), - new NameValuePair("uiUrl", spatialConfig.spatialService.url), - new NameValuePair("alaId", userId.toString()) - ] - - taskWrapper.task.history.put(System.currentTimeMillis() as String, "Uploading points to sandbox: ${sandboxBiocacheServiceUrl}".toString()) - - def response = Util.urlResponse("POST", "${sandboxBiocacheServiceUrl}/upload/post", nameValuePairs) - - if (response) { - if (response.statusCode != 200) { - taskWrapper.task.message = "Error" - taskWrapper.task.history.put(System.currentTimeMillis() as String, response.statusCode + " : " + response.text?.toString()?.substring(0,200)) - return - } - def dataResourceUid = ((JSONObject) JSON.parse(response.text as String)).uid - taskWrapper.task.history.put(System.currentTimeMillis() as String, "Sandbox data resource uid:" + dataResourceUid) - //wait - def statusUrl = "${sandboxBiocacheServiceUrl}/upload/status/${dataResourceUid}" - def start = System.currentTimeMillis() - def maxTime = 60 * 60 * 1000 //2hr - taskWrapper.task.message = "Uploading ..." - while (start + maxTime > System.currentTimeMillis()) { - Thread.sleep(10000) // 10s - taskWrapper.task.history.put(System.currentTimeMillis() as String, "checking status of " + statusUrl) - def txt = Util.getUrl(statusUrl) - if (txt == null) { - // retry - } else if (txt.contains("COMPLETE")) { - taskWrapper.task.history.put(System.currentTimeMillis() as String, "Uploading completed") - //add species layer - def species = [q : "data_resource_uid:${dataResourceUid}", - ws : sandboxHubUrl, - bs : sandboxBiocacheServiceUrl, - name: name] - addOutput("species", (species as JSON).toString()) - log.debug(species.inspect()) - break - } else if (txt.contains("FAILED")) { - log.error(txt) - taskWrapper.task.message = "failed upload " + statusUrl - break - } else { - log.error(txt) - JSONObject json = JSON.parse(txt) as JSONObject - taskWrapper.task.message = json.status + ": " + json.description - } - } + + SandboxIngress si = sandboxService.importPoints(sb.toString(), name, userId) + + // check for error + if (si == null || si.status != "finished") { + taskLog("Error uploading points to sandbox") + throw new Exception("Error uploading points to sandbox") } + + //add species layer + def species = [q : "dataResourceUid:${si.dataResourceUid}", + ws : spatialConfig.sandboxHubUrl, + bs : spatialConfig.sandboxBiocacheServiceUrl, + name: name] + addOutput("species", (species as JSON).toString()) + + taskWrapper.task.history.put(System.currentTimeMillis() as String, "Uploading completed") } } diff --git a/src/main/groovy/au/org/ala/spatial/process/SlaveProcess.groovy b/src/main/groovy/au/org/ala/spatial/process/SlaveProcess.groovy index 74ff368..38c99f2 100644 --- a/src/main/groovy/au/org/ala/spatial/process/SlaveProcess.groovy +++ b/src/main/groovy/au/org/ala/spatial/process/SlaveProcess.groovy @@ -18,6 +18,7 @@ package au.org.ala.spatial.process import au.org.ala.spatial.FileService import au.org.ala.spatial.JournalMapService import au.org.ala.spatial.LayerIntersectService +import au.org.ala.spatial.SandboxService import au.org.ala.spatial.dto.AreaInput import au.org.ala.spatial.dto.ProcessSpecification import au.org.ala.spatial.dto.SpeciesInput @@ -76,6 +77,7 @@ class SlaveProcess { TabulationGeneratorService tabulationGeneratorService FileService fileService WebService webService + SandboxService sandboxService SpatialConfig spatialConfig