From 0b64f277f92655e1b23c36312a0787225b33883e Mon Sep 17 00:00:00 2001 From: Adam Collins Date: Tue, 28 Jan 2025 11:50:01 +1000 Subject: [PATCH 1/2] #250 fix points to grid --- .../org/ala/spatial/TaskQueueService.groovy | 7 + .../spatial/layers/OccurrenceDensity.groovy | 23 +-- .../ala/spatial/process/PointsToGrid.groovy | 6 +- .../au/org/ala/spatial/util/Records.groovy | 190 +++++++++--------- 4 files changed, 112 insertions(+), 114 deletions(-) diff --git a/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy b/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy index 1deb0c4e..893e6858 100644 --- a/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy +++ b/grails-app/services/au/org/ala/spatial/TaskQueueService.groovy @@ -193,6 +193,13 @@ class TaskQueueService { taskWrapper.task.message = 'finished' taskWrapper.task.history.put(System.currentTimeMillis() as String, "finished (id:${taskWrapper.task.id})" as String) + // map output.task to task when null to prevent error when flushing task + taskWrapper.task.output.each { + if (!it.task) { + it.task = taskWrapper.task + } + } + // flush task because it is finished Task.withTransaction { if (!taskWrapper.task.save(flush: true)) { diff --git a/src/main/groovy/au/org/ala/spatial/layers/OccurrenceDensity.groovy b/src/main/groovy/au/org/ala/spatial/layers/OccurrenceDensity.groovy index e7129160..7d839a2d 100644 --- a/src/main/groovy/au/org/ala/spatial/layers/OccurrenceDensity.groovy +++ b/src/main/groovy/au/org/ala/spatial/layers/OccurrenceDensity.groovy @@ -65,8 +65,8 @@ class OccurrenceDensity { this.resolution = resolution this.bbox = bbox - width = (int) ((bbox[2] - bbox[0]) / resolution) - height = (int) ((bbox[3] - bbox[1]) / resolution) + width = (int) Math.round((bbox[2] - bbox[0]) / resolution) + height = (int) Math.round((bbox[3] - bbox[1]) / resolution) } /** @@ -81,8 +81,8 @@ class OccurrenceDensity { */ void setResolution(double resolution) { this.resolution = resolution - width = (int) ((bbox[2] - bbox[0]) / resolution) - height = (int) ((bbox[3] - bbox[1]) / resolution) + width = (int) Math.round((bbox[2] - bbox[0]) / resolution) + height = (int) Math.round((bbox[3] - bbox[1]) / resolution) } /** @@ -90,8 +90,8 @@ class OccurrenceDensity { */ void setBBox(double[] bbox) { this.bbox = bbox - width = (int) ((bbox[2] - bbox[0]) / resolution) - height = (int) ((bbox[3] - bbox[1]) / resolution) + width = (int) Math.round((bbox[2] - bbox[0]) / resolution) + height = (int) Math.round((bbox[3] - bbox[1]) / resolution) } /** @@ -143,7 +143,7 @@ class OccurrenceDensity { boolean worldwrap = (bbox[2] - bbox[0]) == 360 float[] values = new float[width] - int partCount = threadCount * 5 + int partCount = threadCount; int partSize = (int) Math.ceil(width / (double) partCount) GetValuesOccurrencesThread[] getValues = new GetValuesOccurrencesThread[threadCount] LinkedBlockingQueue lbqGetValues = new LinkedBlockingQueue() @@ -151,7 +151,6 @@ class OccurrenceDensity { int[] rowStarts = records.sortedRowStarts(bbox[1], height, resolution) for (int row = 0; row < height; row++) { - long start = System.currentTimeMillis() //get rows int[] oldRow = cRows[0] if (row == 0) { @@ -167,7 +166,6 @@ class OccurrenceDensity { } } } - long t1 = System.currentTimeMillis() //operate on current row int startRow = (row == 0) ? 0 : row + gridSize / 2 //gridSize is odd @@ -225,7 +223,6 @@ class OccurrenceDensity { bw.append("\n") } } - long end = System.currentTimeMillis() } for (int i = 0; i < threadCount; i++) { @@ -260,11 +257,11 @@ class OccurrenceDensity { } int len = (row + 1 < rowStarts.length) ? rowStarts[row + 1] : records.getRecordsSize() - for (int i = rowStarts[row]; i < len; i++) { - int y = height - 1 - (int) ((records.getSortedLatitude(i) - bbox[1]) / resolution) + for (int i = (row < rowStarts.length ? rowStarts[row] : len); i < len; i++) { + int y = height - 1 - Math.round((records.getSortedLatitude(i) - bbox[1]) / resolution) if (y == row) { - int x = (int) ((records.getSortedLongitude(i) - bbox[0]) / resolution) + int x = Math.round((records.getSortedLongitude(i) - bbox[0]) / resolution) if (x >= 0 && x < width) { counts[x]++ diff --git a/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy b/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy index 2600340c..975f5499 100644 --- a/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy +++ b/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy @@ -71,7 +71,7 @@ class PointsToGrid extends SlaveProcess { // dump the species data to a file taskLog("getting species data") - Records records = getRecords(speciesArea.bs.toString(), speciesArea.q.join('&fq='), bbox, null, null) + Records records = new Records(speciesArea.bs.toString(), speciesArea.q.join('&fq='), bbox, null, null, "names_and_lsid", false) //update bbox with spatial extent of records double minx = 180, miny = 90, maxx = -180, maxy = -90 @@ -182,10 +182,6 @@ class PointsToGrid extends SlaveProcess { } } - def getRecords(String bs, String q, double[] bbox, String filename, SimpleRegion region) { - new Records(bs, q, bbox, filename, region) - } - void writeMetadata(String filename, String title, Records records, double[] bbox, boolean odensity, boolean sdensity, int[] counts, String addAreaSqKm, String speciesName, Double gridCellSize, String movingAverage) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss") FileWriter fw = new FileWriter(filename) diff --git a/src/main/groovy/au/org/ala/spatial/util/Records.groovy b/src/main/groovy/au/org/ala/spatial/util/Records.groovy index e7ad84b1..a2cf1f0f 100644 --- a/src/main/groovy/au/org/ala/spatial/util/Records.groovy +++ b/src/main/groovy/au/org/ala/spatial/util/Records.groovy @@ -13,7 +13,6 @@ import java.util.zip.GZIPInputStream * @author Adam */ @Slf4j -@CompileStatic class Records { //private static final Logger logger = log.getLogger(Records.class); @@ -36,8 +35,8 @@ class Records { init(biocache_service_url, q, bbox, filename, region, "names_and_lsid") } - Records(String biocache_service_url, String q, double[] bbox, String filename, SimpleRegion region, String facetField) throws IOException { - init(biocache_service_url, q, bbox, filename, region, facetField) + Records(String biocache_service_url, String q, double[] bbox, String filename, SimpleRegion region, String facetField, Boolean includeYear = true) throws IOException { + init(biocache_service_url, q, bbox, filename, region, facetField, includeYear) } Records(String filename) throws IOException { @@ -307,10 +306,12 @@ class Records { } } - void init(String biocache_service_url, String q, double[] bbox, String filename, SimpleRegion region, String facetField) throws IOException { + void init(String biocache_service_url, String q, double[] bbox, String filename, SimpleRegion region, String facetField, Boolean includeYear = true) throws IOException { int speciesEstimate = 250000 int recordsEstimate = 26000000 - int pageSize = 50000 + int pageSize = 300000000 // Use a large number as a workaround for paging not working. Paging was added + // for an introduced nginx timeout value so this could be considered a revert. + // The biocache-service endpoint is streaming now, except for old sandbox instances. String bboxTerm = null if (bbox != null) { @@ -336,107 +337,104 @@ class Records { if (facetField == null) { facetFieldTerm = '' } - while (start < 300000000) { - String url = biocache_service_url + "/webportal/occurrences.gz?q=" + q.replace(" ", "%20") + bboxTerm + "&pageSize=" + pageSize + "&fq=year%3A*&start=" + start + "&fl=longitude,latitude" + facetFieldTerm + ",year" - - int tryCount = 0 - InputStream is = null - CSVReader csv = null - int maxTrys = 4 - while (tryCount < maxTrys && csv == null) { - tryCount++ - try { - is = getUrlStream(url) - csv = new CSVReader(new InputStreamReader(new GZIPInputStream(is))) - } catch (Exception e) { - log.error("failed try " + tryCount + " of " + maxTrys + ": " + url, e) - } - } - if (csv == null) { - throw new IOException("failed to get records from biocache.") + String yearFq = includeYear ? '&fq=year%3A*' : '' + + // no longer using paging + String url = biocache_service_url + "/webportal/occurrences.gz?q=" + q.replace(" ", "%20") + bboxTerm + "&pageSize=" + pageSize + yearFq + "&start=" + start + "&fl=longitude,latitude" + facetFieldTerm + (includeYear ? ",year" : "") + + int tryCount = 0 + InputStream is = null + CSVReader csv = null + int maxTrys = 4 + while (tryCount < maxTrys && csv == null) { + tryCount++ + try { + is = getUrlStream(url) + csv = new CSVReader(new InputStreamReader(new GZIPInputStream(is))) + } catch (Exception e) { + log.error("failed try " + tryCount + " of " + maxTrys + ": " + url, e) } + } - String[] line - int[] header = new int[4] //to contain [0]=lsid, [1]=longitude, [2]=latitude, [3]=year - int row = start - int currentCount = 0 - while ((line = csv.readNext()) != null) { - if (raf != null) { - for (int i = 0; i < line.length; i++) { - if (i > 0) { - raf.write(",".bytes) - } - raf.write(line[i].bytes) + if (csv == null) { + throw new IOException("failed to get records from biocache.") + } + + String[] line + int[] header = new int[4] //to contain [0]=lsid, [1]=longitude, [2]=latitude, [3]=year + int row = start + int currentCount = 0 + while ((line = csv.readNext()) != null) { + if (raf != null) { + for (int i = 0; i < line.length; i++) { + if (i > 0) { + raf.write(",".bytes) } - raf.write("\n".bytes) + raf.write(line[i].bytes) } - currentCount++ - if (currentCount == 1) { - //determine header - for (int i = 0; i < line.length; i++) { - if (line[i] == facetField) { - header[0] = i - } - if (line[i] == "longitude") { - header[1] = i - } - if (line[i] == "latitude") { - header[2] = i - } - if (line[i] == "year") { - header[3] = i - } + raf.write("\n".bytes) + } + currentCount++ + if (currentCount == 1) { + //determine header + for (int i = 0; i < line.length; i++) { + if (line[i] == facetField) { + header[0] = i } - log.debug("header info:" + header[0] + "," + header[1] + "," + header[2] + "," + header[3]) - } else { - if (line.length >= 3) { - try { - double longitude = Double.parseDouble(line[header[1]]) - double latitude = Double.parseDouble(line[header[2]]) - if (region == null || region.isWithin_EPSG900913(longitude, latitude)) { - points.add(longitude) - points.add(latitude) - String species = facetField == null ? 'species' : line[header[0]] - Integer idx = lsidMap.get(species) - if (idx == null) { - idx = lsidMap.size() - lsidMap.put(species, idx) - } - lsidIdx.add(idx) - years.add(Short.parseShort(line[header[3]])) + if (line[i] == "longitude") { + header[1] = i + } + if (line[i] == "latitude") { + header[2] = i + } + if (line[i] == "year") { + header[3] = i + } + } + log.debug("header info:" + header[0] + "," + header[1] + "," + header[2] + "," + header[3]) + } else { + if (line.length >= 3) { + try { + double longitude = Double.parseDouble(line[header[1]]) + double latitude = Double.parseDouble(line[header[2]]) + if (region == null || region.isWithin_EPSG900913(longitude, latitude)) { + points.add(longitude) + points.add(latitude) + String species = facetField == null ? 'species' : line[header[0]] + Integer idx = lsidMap.get(species) + if (idx == null) { + idx = lsidMap.size() + lsidMap.put(species, idx) } - } catch (Exception ignored) { - - } finally { - if (lsidIdx.size() * 2 < points.size()) { - points.remove(points.size() - 1) - points.remove(points.size() - 1) - } else if (years.size() < lsidIdx.size()) { - years.add((short) 0) + lsidIdx.add(idx) + if (includeYear) { + years.add(Short.parseShort(line[header[3]])) } } + } catch (Exception ignored) { + + } finally { + if (lsidIdx.size() * 2 < points.size()) { + points.remove(points.size() - 1) + points.remove(points.size() - 1) + } else if (years.size() < lsidIdx.size()) { + years.add((short) 0) + } } } - row++ - } - if (start == 0) { - start = row - 1 //offset for header } + row++ + } - csv.close() - is.close() + csv.close() + is.close() - if (is != null) { - try { - is.close() - } catch (Exception e) { - log.error(e.getMessage(), e) - } - } - - if (currentCount == 0 || currentCount < pageSize) { - break + if (is != null) { + try { + is.close() + } catch (Exception e) { + log.error(e.getMessage(), e) } } @@ -511,7 +509,7 @@ class Records { @Override int compare(Integer o1, Integer o2) { - return (h - 1 - ((int) ((points.get(o1) - mLat) / res)))-(h - 1 - ((int) ((points.get(o2) - mLat) / res))) + return (h - 1 - (Math.round((points.get(o1) - mLat) / res)))-(h - 1 - (Math.round((points.get(o2) - mLat) / res))) } }) @@ -519,7 +517,7 @@ class Records { int[] rowStarts = new int[height] int row = 0 for (int i = 0; i < sortOrder.length; i++) { - int thisRow = (h - 1 - (int) ((points.get(sortOrder[i]) - mLat) / res)) + int thisRow = (h - 1 - (int) Math.round((points.get(sortOrder[i]) - mLat) / res)) //handle overflow if (thisRow >= height) { @@ -578,10 +576,10 @@ class Records { @Override int compare(Integer o1, Integer o2) { - int v = ((int) ((points.get(o1) - mLat) / res))-((int) ((points.get(o2) - mLat) / res)) + int v = (int) (Math.round((points.get(o1) - mLat) / res) - Math.round((points.get(o2) - mLat) / res)) if (v == 0) { - return ((int) ((points.get(o1 - 1) - mLong) / res))-((int) ((points.get(o2 - 1) - mLong) / res)) + return (int) (Math.round((points.get(o1 - 1) - mLong) / res) - Math.round((points.get(o2 - 1) - mLong) / res)) } else { return v } From c4194bd5096695eeae491c5b2286b9e8640f14e6 Mon Sep 17 00:00:00 2001 From: Adam Collins Date: Tue, 28 Jan 2025 12:48:30 +1000 Subject: [PATCH 2/2] fix tests --- .../groovy/au/org/ala/spatial/process/PointsToGrid.groovy | 6 +++++- .../au/org/ala/spatial/process/PointsToGridSpec.groovy | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy b/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy index 975f5499..e1237321 100644 --- a/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy +++ b/src/main/groovy/au/org/ala/spatial/process/PointsToGrid.groovy @@ -71,7 +71,7 @@ class PointsToGrid extends SlaveProcess { // dump the species data to a file taskLog("getting species data") - Records records = new Records(speciesArea.bs.toString(), speciesArea.q.join('&fq='), bbox, null, null, "names_and_lsid", false) + Records records = getRecords(speciesArea.bs.toString(), speciesArea.q.join('&fq='), bbox) //update bbox with spatial extent of records double minx = 180, miny = 90, maxx = -180, maxy = -90 @@ -218,4 +218,8 @@ class PointsToGrid extends SlaveProcess { fw.close() } + // Isolate method for mocking + def getRecords(String bs, String q, double[] bbox) { + return new Records(bs, q, bbox, null, null, "names_and_lsid", false) + } } diff --git a/src/test/groovy/au/org/ala/spatial/process/PointsToGridSpec.groovy b/src/test/groovy/au/org/ala/spatial/process/PointsToGridSpec.groovy index 3d65a512..9d046013 100644 --- a/src/test/groovy/au/org/ala/spatial/process/PointsToGridSpec.groovy +++ b/src/test/groovy/au/org/ala/spatial/process/PointsToGridSpec.groovy @@ -74,7 +74,7 @@ class PointsToGridSpec extends Specification implements GrailsUnitTest { } @Override - def getRecords(String bs, String q, double[] bbox, String filename, SimpleRegion region) { + def getRecords(String bs, String q, double[] bbox) { new RecordsMock(tmpCsv) } }