Dataflow. Geolocation decoding for streaming pipelines

There is common pattern in today’s mobile applications to collect location information in the form of latitude and longitude. There are many tools and libraries that can convert a geographic point into a city or region but integration this into a streaming pipeline is not an easy task. Google Dataflow is the best tool to stream data into BigQuery, and because of the programing model, kotlin in this case, is possible to use common libraries like GeoTools to convert the location into a city or country name at the ingestion time and make this information available immediately.

The solution overview

Finding a city using polygons of each major city in the world is quite heavy and is not practical as this is a streaming pipeline, and the processing of the messages must be as light as possible and on the other hand the accuracy of a location is not that important. Instead the problem was breakdown into 2 parts:

  • first find the region and the country for a geographic point
  • then search in a list of cities for the region which one is closest one.

This solution is much more efficient. Check the benchmark section to see performance.

The region and country map is taken from www.naturalearthdata.com and the city list was filtered to contain all the cities with more then 100k population, but at least one city per region.

TL;DR

You can find the full example of how you can find the city, region and country from latitude and longitude in a streaming pipeline on GitHub.

The implementation

The implementation of this pipeline is done kotlin.

First, add some dependencies to your project

    // geotools
    implementation("org.geotools:gt-shapefile:25.2")
    // csv
    implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.1.0")

Next, let’s define our main class


class CityLocator {

  class City(
      val name: String,
      val country: String,
      val state: String,
      val lat: Double,
      val lon: Double
  ) {}

}

Load the map and the list of the cities:

companion object {
    val filterFactory = CommonFactoryFinder.getFilterFactory2()
    val geometryFactory = GeometryFactory()

    val statesCollection =
            {
                val statesShapeFile =
                        {}.javaClass.getResource("/ne_10m_admin_1_states_provinces.shp")

                val params = HashMap<String, Any>()
                params.put("url", statesShapeFile)
                val ds = DataStoreFinder.getDataStore(params)
                if (ds == null) {
                    throw IOException("couldn't open " + params.get("url"))
                }
                SpatialIndexFeatureCollection(
                        ds.getFeatureSource(ds.getNames().get(0)).getFeatures()
                )
            }()

    val cities =
            {
                val cities = mutableMapOf<Int, MutableList<City>>()

                csvReader().open({}.javaClass.getResourceAsStream("/worldcities_final.csv")) {
                    readAllWithHeaderAsSequence().forEach { row: Map<String, String> ->
                        val key = row["ne_id"]!!.toInt()
                        if (!cities.contains(key)) {
                            cities.set(key, mutableListOf<City>())
                        }
                        // ne_id,iso_a2,state,city,lat,lon
                        cities.get(key)
                                ?.add(
                                        City(
                                                row["city"]!!,
                                                row["iso_a2"]!!,
                                                row["state"]!!,
                                                row["lat"]!!.toDouble(),
                                                row["lon"]!!.toDouble()
                                        )
                                )
                    }
                }
                cities
            }()
}

For the city list we will create a map of regions ids where the key is the region id and the value is the list of the cities.

Next, is the lookup function. Using geotools we can search a point into a collection and get back a list of attributes

private fun lookup(geometry: Geometry): SimpleFeature? {

    val filter =
            filterFactory.intersects(
                    filterFactory.property("the_geom"),
                    filterFactory.literal(geometry)
            )
    val features = statesCollection.subCollection(filter)
    val itr = features.features()

    try {
        if (itr.hasNext()) {
            return itr.next()
        }
    } finally {
        itr.close()
    }
    return null
}

When we search for the region we search first for the point and then if we do not find it we will try a search by a polygon.

private fun getRegion(lat: Double, lon: Double): Triple<String, String, Int> {

    val point = geometryFactory.createPoint(Coordinate(lon, lat))
    var feature: SimpleFeature? = this.lookup(point)

    if (feature == null) {
        // if we cannot find the point let's try a polygon
        feature =
                this.lookup(
                        {
                            val margin = 0.1
                            this.geometryFactory.createPolygon(
                                    arrayOf(
                                            Coordinate(lon - margin, lat - margin),
                                            Coordinate(lon - margin, lat + margin),
                                            Coordinate(lon + margin, lat + margin),
                                            Coordinate(lon + margin, lat - margin),
                                            Coordinate(lon - margin, lat - margin)
                                    )
                            )
                        }()
                )
    }

    if (feature != null) {
        val buffer = Charsets.ISO_8859_1.encode(feature.getAttribute("name") as String)
        val utf8EncodedString = Charsets.UTF_8.decode(buffer).toString()
        return Triple(
                utf8EncodedString,
                feature.getAttribute("iso_a2").toString(),
                feature.getAttribute("ne_id").toString().toInt()
        )
    }
    return Triple("", "", 0)
}

Once we found the region the next step is to find the nearest city.

private fun getNearestCity(lat: Double, lon: Double, cities: List<City>): City? {

    val gc = GeodeticCalculator()
    val startPosition =
            JTS.toDirectPosition(Coordinate(lon, lat), gc.coordinateReferenceSystem)
    var nearestCity: City? = null
    var minDistance = Double.POSITIVE_INFINITY
    cities.forEach {
        gc.setStartingPosition(startPosition)
        gc.setDestinationPosition(
                JTS.toDirectPosition(
                        Coordinate(it.lon, it.lat),
                        gc.coordinateReferenceSystem
                )
        )
        val distance = gc.getOrthodromicDistance()
        if (nearestCity == null) {
            nearestCity = it
            minDistance = distance
        } else if (distance < minDistance) {
            nearestCity = it
            minDistance = distance
        }
    }
    return nearestCity
}

And finally warp everything into a single public static function:

fun findCity(tableRow: TableRow, lat: Double, lon: Double): Boolean {
    // even if is a valid address, we do not check for lat=0 and lon=0 due the high
    // probability to be an error or a missing value
    if (lat == 0.0 && lon == 0.0) {
        return true
    }

    val (region, country, regionid) = getRegion(lat, lon)
    tableRow.set(
            "country",
            if (country != "") {
                country
            } else {
                null
            }
    )
    tableRow.set(
            "region",
            if (country != "") {
                region
            } else {
                null
            }
    )

    val regionCities = cities[regionid]
    if (regionCities != null) {
        val city = getNearestCity(lat, lon, regionCities)
        if (city != null) {
            tableRow.set("country", city.country)
            tableRow.set("region", city.state)
            tableRow.set("city", city.name)
            return true
        }
    }
    return false
}

Now in the processElement of the main pipeline you can use the class like this:

// ...
val lat = msg.get("lat").getValue<Double>()
val lon = msg.get("log").getValue<Double>()
CityLocator.findCity(tableRow, lat, lon)

//...

Note: For simplicity the source code of the main pipeline is not shown here.

You can find a full working example with instructions on how to build and run using this repo on GitHub.

Benchmark

In a real production environment this implementation is able to handle at least 1000 requests/second.

Conclusion

When you have a big data pipeline the ideal situation would be to calculate all the fields and do all the transformations at he ingestion time. This way you can make the useful information available immediately and you can reduce costs. Thanks to the unique programing model of the Apache Beam this was possible.