#!/usr/bin/env python3 """Export OSM POIs (e.g. fuel stations) from a .osm.pbf to GeoJSON + CSV. These exports feed FleetNow's toggleable map-overlay layers (see docs/OSM_POI_EXPORT.md and the fleetnow repo's README "Map overlay layers"). No system tooling needed — run via uv so pyosmium's prebuilt wheel is fetched: uv run --no-project --with osmium python scripts/export_osm_pois.py \ kenya-260605.osm.pbf --amenity fuel --brand Shell \ --out-geojson shell_stations.geojson --out-csv shell_stations.csv Notes: - Gas stations are OSM ``amenity=fuel``. Brand lives in the ``brand`` tag, but only ~36% of Kenyan stations carry it, so when ``--brand`` is given and a feature has no ``brand`` tag we fall back to matching ``name``/``operator``. - Omit ``--brand`` to export every feature of that amenity. - Nodes use their own coordinate; ways/areas use the centroid of their nodes (so ``locations=True`` is required on apply_file). """ import argparse, json, csv def main(): ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("pbf", help="input .osm.pbf") ap.add_argument("--amenity", default="fuel", help="OSM amenity value (default: fuel)") ap.add_argument("--brand", default=None, help="case-insensitive brand/name match; omit for all") ap.add_argument("--out-geojson", default="pois.geojson") ap.add_argument("--out-csv", default="pois.csv") args = ap.parse_args() import osmium brand_lc = args.brand.lower() if args.brand else None def match(t): if t.get("amenity") != args.amenity: return False if brand_lc is None: return True b = (t.get("brand") or "").lower() if b: return brand_lc in b return brand_lc in (t.get("name") or "").lower() or brand_lc in (t.get("operator") or "").lower() feats = [] def add(lon, lat, t, kind, oid): feats.append({ "type": "Feature", "properties": { "name": t.get("name"), "brand": t.get("brand"), "operator": t.get("operator"), "osm_type": kind, "osm_id": oid, }, "geometry": {"type": "Point", "coordinates": [round(lon, 6), round(lat, 6)]}, }) class H(osmium.SimpleHandler): def node(self, o): if match(o.tags): add(o.location.lon, o.location.lat, o.tags, "node", o.id) def way(self, o): if match(o.tags): xs = []; ys = [] for n in o.nodes: try: if n.location.valid(): xs.append(n.location.lon); ys.append(n.location.lat) except Exception: pass if xs: add(sum(xs) / len(xs), sum(ys) / len(ys), o.tags, "way", o.id) H().apply_file(args.pbf, locations=True) json.dump({"type": "FeatureCollection", "features": feats}, open(args.out_geojson, "w")) with open(args.out_csv, "w", newline="") as f: w = csv.writer(f); w.writerow(["name", "lat", "lon", "brand", "operator", "osm_type", "osm_id"]) for ft in feats: p = ft["properties"]; lon, lat = ft["geometry"]["coordinates"] w.writerow([p["name"], lat, lon, p["brand"], p["operator"], p["osm_type"], p["osm_id"]]) print(f"exported {len(feats)} features -> {args.out_geojson}, {args.out_csv}") if __name__ == "__main__": main()