Source code for gtkit.gpdops

from typing import Tuple

import geopandas as gpd
from shapely.geometry import box
from sklearn.cluster import AgglomerativeClustering

from gtkit.gops import centroid_with_z, midpoint


[docs]def gpdread(inp_file: str) -> gpd.GeoDataFrame: """ Read a GeoDataFrame from a file. Args: inp_file (str): The path to the input file. Returns: gpd.GeoDataFrame: The GeoDataFrame read from the file. """ return gpd.read_file(inp_file)
[docs]def nearest_linestring_to_a_point_df( pts: gpd.GeoDataFrame, lines: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """ Find the nearest linestring to each point in a GeoDataFrame. Args: pts (gpd.GeoDataFrame): The GeoDataFrame containing points. lines (gpd.GeoDataFrame): The GeoDataFrame containing linestrings. Returns: gpd.GeoDataFrame: The resulting GeoDataFrame with nearest line information. """ # https://stackoverflow.com/questions/70626218/how-to-find-the-nearest-linestring-to-a-point gdf_p = pts.to_crs(pts.estimate_utm_crs()) gdf_l = lines.to_crs(gdf_p.crs) df_n = gpd.sjoin_nearest(gdf_p, gdf_l).merge( gdf_l, left_on="index_right", right_index=True ) df_n["distance"] = df_n.apply( lambda r: r["geometry_x"].distance(r["geometry_y"]), axis=1 ) return df_n
[docs]def points_in_a_radius_around_centroid( centroid: gpd.GeoDataFrame, lines: gpd.GeoDataFrame, raduis: int = 5 ) -> gpd.GeoDataFrame: """ Find points within a specified radius around centroids. Args: centroid (gpd.GeoDataFrame): The GeoDataFrame containing centroid geometries. lines (gpd.GeoDataFrame): The GeoDataFrame containing linestrings. raduis (int, optional): The radius within which to search for points. Defaults to 5. Returns: gpd.GeoDataFrame: The resulting GeoDataFrame with points within the radius. """ # https://gis.stackexchange.com/questions/246782/geopandas-line-polygon-intersection return gpd.sjoin(centroid.buffer(raduis), lines, op="intersects")
[docs]def get_centroid3d(geom_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Calculate the 3D centroid for each geometry in the GeoDataFrame. Args: geom_df (gpd.GeoDataFrame): The GeoDataFrame containing geometries. Returns: gpd.GeoDataFrame: The GeoDataFrame with 3D centroids added. """ geom_df["geometry"] = geom_df.apply( lambda row: centroid_with_z( row.geom if "geom" in row else row.geometry, ), axis=1, ) return geom_df
[docs]def get_centroid(geom_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Calculate the centroid for each geometry in the GeoDataFrame. Args: geom_df (gpd.GeoDataFrame): The GeoDataFrame containing geometries. Returns: gpd.GeoDataFrame: The GeoDataFrame with centroid points added. """ geom_df["midpoint"] = geom_df.apply( lambda row: row["geometry"].centroid, axis=1, ) return geom_df
[docs]def get_common_columns( df_a: gpd.GeoDataFrame, df_b: gpd.GeoDataFrame ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """ Get GeoDataFrames with common columns between two GeoDataFrames. Args: df_a (gpd.GeoDataFrame): The first GeoDataFrame. df_b (gpd.GeoDataFrame): The second GeoDataFrame. Returns: Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: GeoDataFrames with common columns. """ df_b_drop = non_overlapping_columns(df_b, df_a) if len(df_b_drop) != 0: df_b = df_b.drop(columns=df_b_drop) df_a_drop = non_overlapping_columns(df_a, df_b) if len(df_a_drop) != 0: df_a = df_a.drop(columns=df_a_drop) return df_a, df_b
[docs]def non_overlapping_columns(a: gpd.GeoDataFrame, b: gpd.GeoDataFrame) -> list: """ Get a list of columns that are in GeoDataFrame `a` but not in GeoDataFrame `b`. Args: a (gpd.GeoDataFrame): The first GeoDataFrame. b (gpd.GeoDataFrame): The second GeoDataFrame. Returns: list: A list of non-overlapping column names. """ return list(set(a.columns.tolist()).difference(set(b.columns.tolist())))
[docs]def project_points_to_line( points: gpd.GeoDataFrame, lines: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """ Project points onto the nearest positions on lines. Args: points (gpd.GeoDataFrame): The GeoDataFrame containing points to project. lines (gpd.GeoDataFrame): The GeoDataFrame containing lines to project onto. Returns: gpd.GeoDataFrame: The GeoDataFrame with projected points. """ points["PROJECTED"] = points.apply( lambda row: lines.interpolate( lines.project(row.geom if "geom" in row else row.geometry) ), axis=1, ) return points
[docs]def merge_geometries(in_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Merge geometries within a GeoDataFrame into a single geometry. Args: in_df (gpd.GeoDataFrame): The GeoDataFrame containing geometries to merge. Returns: gpd.GeoDataFrame: The GeoDataFrame with merged geometries. """ if not in_df.empty: in_df = in_df.dropna(axis=1) unary_geom = ( [in_df.geometry.unary_union] if len(in_df) == 1 else in_df.geometry.unary_union.geoms ) geoms = list() for g in unary_geom: geoms.append(g) df = gpd.GeoDataFrame( columns=["ids", "geometry"], crs=in_df.crs, geometry="geometry" ) df["geometry"] = list(geoms) df["ids"] = list(range(len(geoms))) return df
[docs]def merge_geoms_df_from_file(inp_file: str) -> gpd.GeoDataFrame: """ Read a GeoDataFrame from a file and merge its geometries. Args: inp_file (str): The path to the input file. Returns: gpd.GeoDataFrame: The GeoDataFrame with merged geometries. """ return merge_geometries(gpdread(inp_file=inp_file))
[docs]def cluster_shapes_by_distance( df: gpd.GeoDataFrame, distance: float, check_crs: bool = False ) -> gpd.GeoDataFrame: """ https://gis.stackexchange.com/a/437352 Make groups for all shapes within a defined distance. For a shape to be excluded from a group, it must be greater than the defined distance from *all* shapes in the group. Distances are calculated using shape centroids. Cluster shapes within a defined distance based on centroids. Args: df (gpd.GeoDataFrame): The GeoDataFrame containing shapes. distance (float): The distance threshold for clustering. check_crs (bool, optional): Check if the GeoDataFrame has a projected CRS. Defaults to False. Returns: gpd.GeoDataFrame: The GeoDataFrame with cluster labels. """ if check_crs: assert ( df.crs.is_projected ), "geodf should be a projected crs with meters as the unit" centers = [p.centroid for p in df.geometry] centers_xy = [[c.x, c.y] for c in centers] cluster = AgglomerativeClustering( n_clusters=None, linkage="single", affinity="euclidean", distance_threshold=distance, ) cluster.fit(centers_xy) df["group"] = cluster.labels_ return df
[docs]def bbox_df(inp_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Create bounding boxes around geometries in a GeoDataFrame. Args: inp_df (gpd.GeoDataFrame): The GeoDataFrame containing geometries. Returns: gpd.GeoDataFrame: The GeoDataFrame with bounding boxes. """ _bounds = inp_df.bounds geometry = [ box(*i, ccw=True) for i in zip(_bounds.minx, _bounds.miny, _bounds.maxx, _bounds.maxy) ] return gpd.GeoDataFrame(_bounds, crs=inp_df.crs, geometry=geometry)
[docs]def get_points_within_bbox( bbox: gpd.GeoDataFrame, points: gpd.GeoDataFrame ) -> gpd.GeoDataFrame: """ Get points within a bounding box. Args: bbox (gpd.GeoDataFrame): The GeoDataFrame representing the bounding box. points (gpd.GeoDataFrame): The GeoDataFrame containing points. Returns: gpd.GeoDataFrame: The GeoDataFrame with points within the bounding box. """ intersection = gpd.sjoin( points.to_crs(bbox.crs), bbox_df, op="intersects", ) common_cols = intersection.columns.intersection(points.columns) return intersection[common_cols]
[docs]def line_midpoint_df(geom_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Calculate midpoints for line geometries in a GeoDataFrame. Args: geom_df (gpd.GeoDataFrame): The GeoDataFrame containing line geometries. Returns: gpd.GeoDataFrame: The GeoDataFrame with midpoint geometries added. """ geom_df["midpoint"] = geom_df.apply( lambda row: midpoint(row["geometry"]), axis=1, ) return geom_df
[docs]def create_temp_df(from_df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Create an empty GeoDataFrame with the same columns and CRS as the input GeoDataFrame. Args: from_df (gpd.GeoDataFrame): The input GeoDataFrame. Returns: gpd.GeoDataFrame: An empty GeoDataFrame with matching columns and CRS. """ _temp_df = from_df[0:0].copy(deep=True) _temp_df = _temp_df.dropna(axis=1, how="all") _temp_df = _temp_df.reset_index(drop=True) return _temp_df