Skip to content

Commit

Permalink
Merge pull request #850 from lonvia/export-by-country
Browse files Browse the repository at this point in the history
Add caching of address information for Nominatim export
  • Loading branch information
lonvia authored Nov 15, 2024
2 parents 27cc937 + 23c18ef commit 42e0c39
Show file tree
Hide file tree
Showing 22 changed files with 849 additions and 506 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package de.komoot.photon.opensearch;

import de.komoot.photon.searcher.PhotonResult;
import jakarta.json.JsonArray;
import org.json.JSONObject;

import java.util.Map;
Expand Down
100 changes: 87 additions & 13 deletions src/main/java/de/komoot/photon/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.komoot.photon.nominatim.NominatimConnector;
import de.komoot.photon.nominatim.ImportThread;
import de.komoot.photon.nominatim.NominatimImporter;
import de.komoot.photon.nominatim.NominatimUpdater;
import de.komoot.photon.searcher.ReverseHandler;
import de.komoot.photon.searcher.SearchHandler;
Expand All @@ -14,7 +15,8 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Date;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

import static spark.Spark.*;

Expand Down Expand Up @@ -107,9 +109,8 @@ private static void startJsonDump(CommandLineArgs args) {
try {
final String filename = args.getJsonDump();
final JsonDumper jsonDumper = new JsonDumper(filename, args.getLanguages(), args.getExtraTags());
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
nominatimConnector.setImporter(jsonDumper);
nominatimConnector.readEntireDatabase(args.getCountryCodes());

importFromDatabase(args, jsonDumper);
LOGGER.info("Json dump was created: {}", filename);
} catch (FileNotFoundException e) {
throw new UsageException("Cannot create dump: " + e.getMessage());
Expand All @@ -121,22 +122,95 @@ private static void startJsonDump(CommandLineArgs args) {
* Read all data from a Nominatim database and import it into a Photon database.
*/
private static void startNominatimImport(CommandLineArgs args, Server esServer) {
DatabaseProperties dbProperties;
NominatimConnector nominatimConnector = new NominatimConnector(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
Date importDate = nominatimConnector.getLastImportDate();
final var languages = initDatabase(args, esServer);

LOGGER.info("Starting import from nominatim to photon with languages: {}", String.join(",", languages));
importFromDatabase(args, esServer.createImporter(languages, args.getExtraTags()));

LOGGER.info("Imported data from nominatim to photon with languages: {}", String.join(",", languages));
}

private static String[] initDatabase(CommandLineArgs args, Server esServer) {
final var nominatimConnector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
final Date importDate = nominatimConnector.getLastImportDate();

try {
dbProperties = esServer.recreateIndex(args.getLanguages(), importDate, args.getSupportStructuredQueries()); // clear out previous data
// Clear out previous data.
var dbProperties = esServer.recreateIndex(args.getLanguages(), importDate, args.getSupportStructuredQueries());
return dbProperties.getLanguages();
} catch (IOException e) {
throw new UsageException("Cannot setup index, elastic search config files not readable");
}
}

private static void importFromDatabase(CommandLineArgs args, Importer importer) {
final var connector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
connector.prepareDatabase();
connector.loadCountryNames();

LOGGER.info("Starting import from nominatim to photon with languages: {}", String.join(",", dbProperties.getLanguages()));
nominatimConnector.setImporter(esServer.createImporter(dbProperties.getLanguages(), args.getExtraTags()));
nominatimConnector.readEntireDatabase(args.getCountryCodes());
String[] countries = args.getCountryCodes();

if (countries == null || countries.length == 0) {
countries = connector.getCountriesFromDatabase();
} else {
countries = Arrays.stream(countries).map(String::trim).filter(s -> !s.isBlank()).toArray(String[]::new);
}

final int numThreads = args.getThreads();
ImportThread importThread = new ImportThread(importer);

try {

if (numThreads == 1) {
for (var country : countries) {
connector.readCountry(country, importThread);
}
} else {
final Queue<String> todolist = new ConcurrentLinkedQueue<>(List.of(countries));

final List<Thread> readerThreads = new ArrayList<>(numThreads);

for (int i = 0; i < numThreads; ++i) {
final NominatimImporter threadConnector;
if (i > 0) {
threadConnector = new NominatimImporter(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
threadConnector.loadCountryNames();
} else {
threadConnector = connector;
}
final int threadno = i;
Runnable runner = () -> {
String nextCc = todolist.poll();
while (nextCc != null) {
LOGGER.info("Thread {}: reading country '{}'", threadno, nextCc);
threadConnector.readCountry(nextCc, importThread);
nextCc = todolist.poll();
}
};
Thread thread = new Thread(runner);
thread.start();
readerThreads.add(thread);
}
readerThreads.forEach(t -> {
while (true) {
try {
t.join();
break;
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted:", e);
// Restore interrupted state.
Thread.currentThread().interrupt();
}
}
});
}
} finally {
importThread.finish();
}

LOGGER.info("Imported data from nominatim to photon with languages: {}", String.join(",", dbProperties.getLanguages()));
}


private static void startNominatimUpdateInit(CommandLineArgs args) {
NominatimUpdater nominatimUpdater = new NominatimUpdater(args.getHost(), args.getPort(), args.getDatabase(), args.getUser(), args.getPassword());
nominatimUpdater.initUpdates(args.getNominatimUpdateInit());
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/de/komoot/photon/CommandLineArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
@Parameters(parametersValidators = CorsMutuallyExclusiveValidator.class)
public class CommandLineArgs {

@Parameter(names = "-j", description = "Number of threads to use for import.")
private int threads = 1;

@Parameter(names = "-structured", description = "Enable support for structured queries.")
private boolean supportStructuredQueries = false;

Expand Down Expand Up @@ -107,6 +110,10 @@ public String[] getLanguages() {
return getLanguages(true);
}

public int getThreads() {
return Integer.min(10, Integer.max(0, threads));
}

public String getCluster() {
return this.cluster;
}
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/de/komoot/photon/PhotonDoc.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.komoot.photon;

import de.komoot.photon.nominatim.model.AddressRow;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.Point;
Expand Down Expand Up @@ -217,17 +218,27 @@ public boolean isUsefulForIndex() {
private void extractAddress(Map<String, String> address, AddressType addressType, String addressFieldName) {
String field = address.get(addressFieldName);

if (field != null) {
Map<String, String> map = addressParts.computeIfAbsent(addressType, k -> new HashMap<>());
if (field == null) {
return;
}

Map<String, String> map = addressParts.get(addressType);
if (map == null) {
map = new HashMap<>();
map.put("name", field);
addressParts.put(addressType, map);
} else {
String existingName = map.get("name");
if (!field.equals(existingName)) {
// Make a copy of the original name map because the map is reused for other addresses.
map = new HashMap<>(map);
LOGGER.debug("Replacing {} name '{}' with '{}' for osmId #{}", addressFieldName, existingName, field, osmId);
// we keep the former name in the context as it might be helpful when looking up typos
if (!Objects.isNull(existingName)) {
context.add(Collections.singletonMap("formerName", existingName));
}
map.put("name", field);
addressParts.put(addressType, map);
}
}
}
Expand All @@ -241,6 +252,23 @@ public boolean setAddressPartIfNew(AddressType addressType, Map<String, String>
return addressParts.computeIfAbsent(addressType, k -> names) == names;
}

/**
* Complete address data from a list of address rows.
*/
public void completePlace(List<AddressRow> addresses) {
final AddressType doctype = getAddressType();
for (AddressRow address : addresses) {
final AddressType atype = address.getAddressType();

if (atype != null
&& (atype == doctype || !setAddressPartIfNew(atype, address.getName()))
&& address.isUsefulForContext()) {
// no specifically handled item, check if useful for context
getContext().add(address.getName());
}
}
}

public void setCountry(Map<String, String> names) {
addressParts.put(AddressType.COUNTRY, names);
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/de/komoot/photon/nominatim/DBDataAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public interface DBDataAdapter {
* Wrap a DELETE statement with a RETURNING clause.
*/
String deleteReturning(String deleteSQL, String columns);

/**
* Wrap function to create a json array from a SELECT.
*/
String jsonArrayFromSelect(String valueSQL, String fromSQL);
}
9 changes: 5 additions & 4 deletions src/main/java/de/komoot/photon/nominatim/ImportThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
/**
* Worker thread for bulk importing data from a Nominatim database.
*/
class ImportThread {
public class ImportThread {
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ImportThread.class);

private static final int PROGRESS_INTERVAL = 50000;
private static final NominatimResult FINAL_DOCUMENT = new NominatimResult(new PhotonDoc(0, null, 0, null, null));
private final BlockingQueue<NominatimResult> documents = new LinkedBlockingDeque<>(20);
private static final NominatimResult FINAL_DOCUMENT = NominatimResult.fromAddress(new PhotonDoc(0, null, 0, null, null), null);
private final BlockingQueue<NominatimResult> documents = new LinkedBlockingDeque<>(100);
private final AtomicLong counter = new AtomicLong();
private final Importer importer;
private final Thread thread;
Expand Down Expand Up @@ -70,7 +70,8 @@ public void finish() {
Thread.currentThread().interrupt();
}
}
LOGGER.info("Finished import of {} photon documents.", counter.longValue());
LOGGER.info("Finished import of {} photon documents. (Total processing time: {}s)",
counter.longValue(), (System.currentTimeMillis() - startMillis)/1000);
}

private class ImportRunnable implements Runnable {
Expand Down
Loading

0 comments on commit 42e0c39

Please sign in to comment.