Dataflow. Geolocation decoding for streaming pipelines
There is common pattern in today’s mobile applications to collect location information in the form of latitude and longitude. There are many tools and libraries that can convert a geographic point into a city or region but integration this into a streaming pipeline is not an easy task. Google Dataflow is the best tool to stream data into BigQuery, and because of the programing model, kotlin in this case, is possible to use common libraries like GeoTools to convert the location into a city or country name at the ingestion time and make this information available immediately.
The solution overview
Finding a city using polygons of each major city in the world is quite heavy and is not practical as this is a streaming pipeline, and the processing of the messages must be as light as possible and on the other hand the accuracy of a location is not that important. Instead the problem was breakdown into 2 parts:
- first find the region and the country for a geographic point
- then search in a list of cities for the region which one is closest one.
This solution is much more efficient. Check the benchmark section to see performance.
The region and country map is taken from www.naturalearthdata.com and the city list was filtered to contain all the cities with more then 100k population, but at least one city per region.
TL;DR
You can find the full example of how you can find the city, region and country from latitude and longitude in a streaming pipeline on GitHub.
The implementation
The implementation of this pipeline is done kotlin.
First, add some dependencies to your project
// geotools
implementation("org.geotools:gt-shapefile:25.2")
// csv
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.1.0")
Next, let’s define our main class
class CityLocator {
class City(
val name: String,
val country: String,
val state: String,
val lat: Double,
val lon: Double
) {}
}
Load the map and the list of the cities:
companion object {
val filterFactory = CommonFactoryFinder.getFilterFactory2()
val geometryFactory = GeometryFactory()
val statesCollection =
{
val statesShapeFile =
{}.javaClass.getResource("/ne_10m_admin_1_states_provinces.shp")
val params = HashMap<String, Any>()
params.put("url", statesShapeFile)
val ds = DataStoreFinder.getDataStore(params)
if (ds == null) {
throw IOException("couldn't open " + params.get("url"))
}
SpatialIndexFeatureCollection(
ds.getFeatureSource(ds.getNames().get(0)).getFeatures()
)
}()
val cities =
{
val cities = mutableMapOf<Int, MutableList<City>>()
csvReader().open({}.javaClass.getResourceAsStream("/worldcities_final.csv")) {
readAllWithHeaderAsSequence().forEach { row: Map<String, String> ->
val key = row["ne_id"]!!.toInt()
if (!cities.contains(key)) {
cities.set(key, mutableListOf<City>())
}
// ne_id,iso_a2,state,city,lat,lon
cities.get(key)
?.add(
City(
row["city"]!!,
row["iso_a2"]!!,
row["state"]!!,
row["lat"]!!.toDouble(),
row["lon"]!!.toDouble()
)
)
}
}
cities
}()
}
For the city list we will create a map of regions ids where the key is the region id and the value is the list of the cities.
Next, is the lookup function. Using geotools we can search a point into a collection and get back a list of attributes
private fun lookup(geometry: Geometry): SimpleFeature? {
val filter =
filterFactory.intersects(
filterFactory.property("the_geom"),
filterFactory.literal(geometry)
)
val features = statesCollection.subCollection(filter)
val itr = features.features()
try {
if (itr.hasNext()) {
return itr.next()
}
} finally {
itr.close()
}
return null
}
When we search for the region we search first for the point and then if we do not find it we will try a search by a polygon.
private fun getRegion(lat: Double, lon: Double): Triple<String, String, Int> {
val point = geometryFactory.createPoint(Coordinate(lon, lat))
var feature: SimpleFeature? = this.lookup(point)
if (feature == null) {
// if we cannot find the point let's try a polygon
feature =
this.lookup(
{
val margin = 0.1
this.geometryFactory.createPolygon(
arrayOf(
Coordinate(lon - margin, lat - margin),
Coordinate(lon - margin, lat + margin),
Coordinate(lon + margin, lat + margin),
Coordinate(lon + margin, lat - margin),
Coordinate(lon - margin, lat - margin)
)
)
}()
)
}
if (feature != null) {
val buffer = Charsets.ISO_8859_1.encode(feature.getAttribute("name") as String)
val utf8EncodedString = Charsets.UTF_8.decode(buffer).toString()
return Triple(
utf8EncodedString,
feature.getAttribute("iso_a2").toString(),
feature.getAttribute("ne_id").toString().toInt()
)
}
return Triple("", "", 0)
}
Once we found the region the next step is to find the nearest city.
private fun getNearestCity(lat: Double, lon: Double, cities: List<City>): City? {
val gc = GeodeticCalculator()
val startPosition =
JTS.toDirectPosition(Coordinate(lon, lat), gc.coordinateReferenceSystem)
var nearestCity: City? = null
var minDistance = Double.POSITIVE_INFINITY
cities.forEach {
gc.setStartingPosition(startPosition)
gc.setDestinationPosition(
JTS.toDirectPosition(
Coordinate(it.lon, it.lat),
gc.coordinateReferenceSystem
)
)
val distance = gc.getOrthodromicDistance()
if (nearestCity == null) {
nearestCity = it
minDistance = distance
} else if (distance < minDistance) {
nearestCity = it
minDistance = distance
}
}
return nearestCity
}
And finally warp everything into a single public static function:
fun findCity(tableRow: TableRow, lat: Double, lon: Double): Boolean {
// even if is a valid address, we do not check for lat=0 and lon=0 due the high
// probability to be an error or a missing value
if (lat == 0.0 && lon == 0.0) {
return true
}
val (region, country, regionid) = getRegion(lat, lon)
tableRow.set(
"country",
if (country != "") {
country
} else {
null
}
)
tableRow.set(
"region",
if (country != "") {
region
} else {
null
}
)
val regionCities = cities[regionid]
if (regionCities != null) {
val city = getNearestCity(lat, lon, regionCities)
if (city != null) {
tableRow.set("country", city.country)
tableRow.set("region", city.state)
tableRow.set("city", city.name)
return true
}
}
return false
}
Now in the processElement
of the main pipeline you can use the class like this:
// ...
val lat = msg.get("lat").getValue<Double>()
val lon = msg.get("log").getValue<Double>()
CityLocator.findCity(tableRow, lat, lon)
//...
Note: For simplicity the source code of the main pipeline is not shown here.
You can find a full working example with instructions on how to build and run using this repo on GitHub.
Benchmark
In a real production environment this implementation is able to handle at least 1000 requests/second.
Conclusion
When you have a big data pipeline the ideal situation would be to calculate all the fields and do all the transformations at he ingestion time. This way you can make the useful information available immediately and you can reduce costs. Thanks to the unique programing model of the Apache Beam this was possible.