0%

Datahader.Networks 绘制bundling flow

Datahader.Networks 绘制bundling flow

详细文档参见 https://datashader.org/user_guide/Networks.html

我们曾经介绍了绘制普通流的方式,实际上仅仅是基于贝塞尔曲线做了插值,绘制了类似于“飞机航线”的流线。使用简单的arcgis插件,也可以绘制很不错直观的流图,比如下面我画的中国人口流动图:

但是这种流绘制方式虽然最为直观、表达最为准确,但有时候(特别是边权差异不大的时候)像进了盘丝洞,从图上无法提炼出太多的信息,因此,甲方要求我对小红书上一个博主绘制的具有聚集效应的流图进行复现,我先试了js库d3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
function generateSegments(nodes, links) {
let bundle = { nodes: [], links: [], paths: [] };
bundle.nodes = nodes.map(d => { d.fx = d.x; d.fy = d.y; return d; });

links.forEach(d => {
let length = distance(d.source, d.target);
let total = Math.round(scales.segments(length));

let xscale = d3.scaleLinear().domain([0, total + 1]).range([d.source.x, d.target.x]);
let yscale = d3.scaleLinear().domain([0, total + 1]).range([d.source.y, d.target.y]);

let source = d.source;
let local = [source];
for (let j = 1; j <= total; j++) {
let target = { x: xscale(j), y: yscale(j) };
local.push(target);
bundle.nodes.push(target);
bundle.links.push({ source, target });
source = target;
}
local.push(d.target);
bundle.links.push({ source, target: d.target });
bundle.paths.push(local);
});
return bundle;
}

大概实现了下面的场景:

不过这种绘制方法仍然无法达到类似的效果,遂抛弃之,找到了该博主处理流数据的库:datashader,接下来我们详细讲讲如何利用datashader.Networks进行绘制,先看看user guide中对该部分的简介:

The point and line-segment plotting provided by Datashader can be put together in different ways to visualize specific types of data. For instance, network graph data, i.e., networks of nodes connected by edges, can very naturally be represented by points and lines. Here we will show examples of using Datashader’s graph-specific plotting tools, focusing on how to visualize very large graphs while allowing any portion of the rendering pipeline to replaced with components suitable for specific problems.

Datashader 使用 Hammer et al. (2009) 提出的 “Force-Directed Edge Bundling (FDEB)” 算法的改进版,让相似或空间上邻近的边“靠拢”成平滑的流线(bundles),既保持结构关系,又提升可读性。

具体来说,对于每条边,他们会被拆分为多个小段(segments),每个 segment 是一个可移动的控制点。每个控制点会受到几种力的作用:

    1. 内部张力 (Tension)
    1. 相似边吸引力 (Attractive force)
    1. 带宽控制 (Bandwidth)

通过多次迭代,方法使得带宽逐次变小,使得流互相靠近,形成束状结构。

我们来复现一下user guide提供的示例代码
导入包

1
2
3
4
5
6
7
8
9
10
import math
import numpy as np
import pandas as pd

import datashader as ds
import datashader.transfer_functions as tf
from datashader.layout import random_layout, circular_layout, forceatlas2_layout
from datashader.bundling import connect_edges, hammer_bundle

from itertools import chain

随机生成边与节点

1
2
3
4
5
6
7
8
9
np.random.seed(0)
n=100
m=20000

nodes = pd.DataFrame(["node"+str(i) for i in range(n)], columns=['name'])
edges = pd.DataFrame(np.random.randint(0,len(nodes), size=(m, 2)), columns=['source', 'target'])

circular = circular_layout(nodes, uniform=False)
randomloc = random_layout(nodes)

生成布局与定义一些绘制函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
cvsopts = dict(plot_height=400, plot_width=400)

def nodesplot(nodes, name=None, canvas=None, cat=None):
canvas = ds.Canvas(**cvsopts) if canvas is None else canvas
aggregator=None if cat is None else ds.count_cat(cat)
agg=canvas.points(nodes,'x','y',aggregator)
return tf.spread(tf.shade(agg, cmap=["#FF3333"]), px=3, name=name)

forcedirected = forceatlas2_layout(nodes, edges)

def edgesplot(edges, name=None, canvas=None):
canvas = ds.Canvas(**cvsopts) if canvas is None else canvas
return tf.shade(canvas.line(edges, 'x','y', agg=ds.count()), name=name)

def graphplot(nodes, edges, name="", canvas=None, cat=None):
if canvas is None:
xr = nodes.x.min(), nodes.x.max()
yr = nodes.y.min(), nodes.y.max()
canvas = ds.Canvas(x_range=xr, y_range=yr, **cvsopts)

np = nodesplot(nodes, name + " nodes", canvas, cat)
ep = edgesplot(edges, name + " edges", canvas)
return tf.stack(ep, np, how="over", name=name)

绘制:

1
2
3
4
5
6
7
8
9
cd = circular
fd = forcedirected

cd_d = graphplot(cd, connect_edges(cd,edges), "Circular layout")
fd_d = graphplot(fd, connect_edges(fd,edges), "Force-directed")
cd_b = graphplot(cd, hammer_bundle(cd,edges), "Circular layout, bundled")
fd_b = graphplot(fd, hammer_bundle(fd,edges), "Force-directed, bundled")

tf.Images(cd_d,fd_d,cd_b,fd_b).cols(2)

还是挺方便的。接下来我们使用自己的数据进行绘制。必要的数据包括node(包含name与xy坐标)和edge(包含代表od的source和target,可自行选择是否加上weight)。
导入包:

1
2
3
4
5
import pandas as pd
from datashader.bundling import hammer_bundle
import pyproj
import geopandas as gpd
from shapely.geometry import LineString

加载数据,并映射一下字段名到其要求的值,并且投影坐标以绘制。这里有一个非常坑的要点:
node 的 index(pandas.dataframe的index)才是和 edge 的 source 与 target 匹配的 key ,而不是 name 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
china_edge = pd.read_csv("/Users/fangtianyao/Desktop/其他/flowdata/edge_1.csv")
china_node = pd.read_csv("/Users/fangtianyao/Desktop/其他/flowdata/node_with_name.csv")
nodes = china_node[['iata', 'longitude', 'latitude']].rename(
columns={'iata': 'name', 'longitude': 'x84', 'latitude': 'y84'}
)
edges = china_edge[['origin', 'destination', 'count']].rename(
columns={'origin': 'source', 'destination': 'target', 'count': 'weight'}
)

proj = pyproj.Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True)
nodes['x'], nodes['y'] = proj.transform(nodes['x84'], nodes['y84'])
layout = nodes.copy()
layout.index = layout['name'].astype(int)
layout.index.name = None

调用hammer_bundle计算:

1
bundled_edges = hammer_bundle(layout, edges, decay=decay, initial_bandwidth=bw)

这里的 decaybw 是两个0-1的参数,这里我们先分别取0.5。

计算得到的 bundled_edges 是一个包含所有扭曲过后的边的所有构成点的df,每条边的点之间用一个空行相隔。打印给你看看。

x y weight
1301138 1.257390e+07 2.951534e+06 7.0
1301139 1.258085e+07 2.914048e+06 7.0
1301140 1.258971e+07 2.877599e+06 7.0
1301141 1.264494e+07 2.852891e+06 7.0
1301142 NaN NaN NaN

为了绘制,我们使用该数据创建 GeoDataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
lines = []
current_points = []
current_weight = None

for idx, row in bundled_edges.iterrows():
if pd.isna(row['x']) or pd.isna(row['y']):
# 遇到空行,生成上一条边
if len(current_points) >= 2:
line = LineString(current_points)
lines.append({'weight': current_weight, 'geometry': line})
current_points = []
current_weight = None
else:
current_points.append((row['x'], row['y']))
# 假设 weight 相同,取第一个点的 weight
if current_weight is None:
current_weight = row['weight']

if len(current_points) >= 2:
line = LineString(current_points)
lines.append({'weight': current_weight, 'geometry': line})

# 创建 GeoDataFrame
gdf = gpd.GeoDataFrame(lines, crs="EPSG:3857")

我们用这个数据绘制流在地图上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from shapely.geometry import Point
import contextily as ctx

nodes_gdf = gpd.GeoDataFrame(
nodes,
geometry=[Point(xy) for xy in zip(nodes['x'], nodes['y'])],
crs="EPSG:3857"
)

edges_gdf = gdf.copy()

# 计算线宽
min_width = 0.2
max_width = 5.0
weights = edges_gdf['weight'].astype(float)
line_widths = min_width + (weights - weights.min()) / (weights.max() - weights.min()) * (max_width - min_width)

# 绘图
fig, ax = plt.subplots(figsize=(12, 10), dpi=300)

# 画边
edges_gdf.plot(ax=ax, linewidth=line_widths, edgecolor='#305669', alpha=0.6)

# 画节点
nodes_gdf.plot(ax=ax, color='#0C2B4E', markersize=15, alpha=0.9)

# 添加底图
ctx.add_basemap(ax, source=ctx.providers.OpenStreetMap.Mapnik, crs=nodes_gdf.crs)

ax.set_title("fty", fontsize=16)
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
plt.axis('equal')
plt.show()

为了比较不同 decaybw 参数组合的效果,我做了一个小组合测试,读者可以自己进行测试,亦可以参考下user guide里面基于随机数据做的测试。

还是不错的吧😌

最后给出封装好的函数,该函数用于对空间网络数据执行基于 Hammer 算法 的边捆绑(Edge Bundling)操作,以减少边的视觉交叉并增强整体结构的空间可读性。
输入参数
• edge_path (str):边数据文件路径,要求为 CSV 格式,包含源节点、目标节点及权重等字段。
• node_path (str):节点数据文件路径,要求为 CSV 格式,包含节点唯一标识及经纬度坐标字段。
• nodes_id_col (str):节点文件中表示节点唯一编号的列名,默认 “node_id” 务必是int类型!!!
• node_lon_col (str):节点经度列名,默认 “lon”。
• node_lat_col (str):节点纬度列名,默认 “lat”。
• edge_source_col (str):边文件中表示起点节点的列名,默认 “source” 和上面的node_id匹配,务必是int类型!!!
• edge_target_col (str):边文件中表示终点节点的列名,默认 “target” 和上面的node_id匹配,务必是int类型!!!
• weight_col (str):边文件中表示权重(如流量、强度)的列名,默认 “weight”。
• decay (float):Hammer 算法的衰减参数,控制边的收敛速度(0–1之间)。
• bw (float):初始带宽参数,控制捆绑的紧密程度(0–1之间)。
• input_crs (str):输入数据的坐标参考系,默认 “EPSG:4326”(WGS84)。
• output (bool):是否输出文件,默认 True。
• output_format (Literal[“geojson”, “shp”]):输出文件格式,默认 “geojson”。
• output_path (Optional[str]):输出文件保存路径,若为 None,则仅返回结果不保存。
输出结果
• 返回值 (geopandas.GeoDataFrame):
函数返回一个包含边捆绑后几何结果的 GeoDataFrame,其属性包括原始边的全部字段及对应的空间几何 (LineString)。结果的坐标参考系为 “EPSG:4326”,可直接用于 GIS 可视化或空间分析。
若设置 output=True,则函数还会在指定路径下导出相应的 GeoJSON 或 Shapefile 文件。

1
2
3
4
5
6
7
import pandas as pd
import geopandas as gpd
import pyproj
from datashader.bundling import hammer_bundle
from shapely.geometry import LineString
from typing import Literal, Optional
import copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def perform_edge_bundling(
edge_path: str,
node_path: str,
nodes_id_col: str = "node_id",
node_lon_col: str = "lon",
node_lat_col: str = "lat",
edge_source_col: str = "source",
edge_target_col: str = "target",
weight_col: str = "weight",
decay: float = 0.5,
bw: float = 0.5,
input_crs: str = "EPSG:4326",
output: bool = True,
output_format: Literal["geojson", "shp"] = "geojson",
output_path: Optional[str] = None,
) -> gpd.GeoDataFrame:
"""
Perform force-directed edge bundling using the Hammer algorithm
on spatial graph data, and export the result as a geospatial file.

Parameters
----------
edge_path : str
Path to the CSV file containing edge data.
node_path : str
Path to the CSV file containing node data.
nodes_id_col : str, default "node_id"
Column name for node identifiers in the node CSV.
node_lon_col : str, default "lon"
Column name for node longitude.
node_lat_col : str, default "lat"
Column name for node latitude.
edge_source_col : str, default "source"
Column name for edge source node ID.
edge_target_col : str, default "target"
Column name for edge target node ID.
weight_col : str, default "weight"
Column name for edge weights.
decay : float, default 0.5
Decay parameter of the hammer_bundle algorithm (0 < decay < 1).
bw : float, default 0.5
Initial bandwidth parameter of hammer_bundle (controls bundle tightness).
input_crs : str, default "EPSG:4326"
Coordinate reference system (CRS) of the input node coordinates.
Usually "EPSG:4326" for WGS84 or "EPSG:3857" for Web Mercator.
output : bool, default True
Whether to save the output file.
output_format : {"geojson", "shp"}, default "geojson"
Desired output file format.
output_path : str, optional
Path to save the output file. If None, the file will not be written.

Returns
-------
gdf : geopandas.GeoDataFrame
GeoDataFrame containing bundled edge geometries and their weights,
projected in WGS84 (EPSG:4326).

Raises
------
FileNotFoundError
If either `edge_path` or `node_path` does not exist.
ValueError
If required columns are missing in the provided CSVs.
"""

print("[1/6] 读取节点与边文件...")
edges = pd.read_csv(edge_path)
edges_copy = copy.deepcopy(edges)
nodes = pd.read_csv(node_path)

print(f"节点数: {len(nodes)}, 边数: {len(edges)}")

print("[2/6] 检查输入字段有效性...")
for col in [nodes_id_col, node_lon_col, node_lat_col]:
if col not in nodes.columns:
raise ValueError(f"Missing required column '{col}' in node file.")
for col in [edge_source_col, edge_target_col, weight_col]:
if col not in edges.columns:
raise ValueError(f"Missing required column '{col}' in edge file.")

edges = edges[[edge_source_col, edge_target_col, weight_col]].rename(
columns={edge_source_col: 'source', edge_target_col: 'target', weight_col: 'weight'}
)
nodes = nodes[[nodes_id_col, node_lon_col, node_lat_col]]

edges["source"] = edges["source"].astype(int)
edges["target"] = edges["target"].astype(int)

print("[3/6] 坐标转换为 EPSG:3857 ...")
proj_to_3857 = pyproj.Transformer.from_crs(input_crs, "EPSG:3857", always_xy=True)
nodes["x"], nodes["y"] = proj_to_3857.transform(nodes[node_lon_col], nodes[node_lat_col])

layout = nodes.rename(columns={nodes_id_col: "name"})[["name", "x", "y"]]
layout.index = layout["name"].astype(int)
layout.index.name = None

print("[4/6] 执行 Hammer 边捆绑算法 ...")
bundled = hammer_bundle(layout, edges, decay=decay, initial_bandwidth=bw)

print("[5/6] 构造捆绑线段 ...")
lines = []
current_points = []
edge_index = 0

for _, row in bundled.iterrows():
if pd.isna(row["x"]) or pd.isna(row["y"]):
if len(current_points) >= 2 and edge_index < len(edges_copy):
edge_attrs = edges_copy.iloc[edge_index].to_dict()
edge_attrs["geometry"] = LineString(current_points)
lines.append(edge_attrs)
current_points = []
edge_index += 1
else:
current_points.append((row["x"], row["y"]))

if len(current_points) >= 2 and edge_index < len(edges_copy):
edge_attrs = edges_copy.iloc[edge_index].to_dict()
edge_attrs["geometry"] = LineString(current_points)
lines.append(edge_attrs)

gdf = gpd.GeoDataFrame(lines, crs="EPSG:3857")
gdf = gdf.to_crs("EPSG:4326")

print(f"生成 GeoDataFrame,共 {len(gdf)} 条边。")

if output:
print("[6/6] 输出结果文件")
if output_path:
if output_format == "geojson":
gdf.to_file(output_path, driver="GeoJSON")
elif output_format == "shp":
gdf.to_file(output_path, driver="ESRI Shapefile")
else:
raise ValueError("Unsupported output format. Use 'geojson' or 'shp'.")

print(f"文件已保存至: {output_path}")

else:
print("未提供输出路径,未保存文件。")

return gdf