並行程式設計簡介#
本教學作為 JAX 中單一程式多資料 (SPMD) 程式碼的裝置並行性簡介。SPMD 是一種並行技術,其中相同的計算(例如神經網路的前向傳遞)可以在不同裝置(例如多個 GPU 或 Google TPU)上針對不同的輸入資料(例如,批次中的不同輸入)並行執行。
本教學涵蓋三種並行計算模式
透過
jax.jit()
自動並行化:編譯器選擇最佳計算策略(又稱「編譯器掌舵」)。使用
jax.jit()
和jax.lax.with_sharding_constraint()
的半自動並行化使用
jax.experimental.shard_map.shard_map()
手動控制的完全手動並行化:shard_map
啟用每裝置程式碼和明確的通訊集合
使用這些 SPMD 思維模式,您可以將為單一裝置編寫的函數轉換為可以在多個裝置上並行執行的函數。
如果您在 Google Colab 筆記本中執行這些範例,請確保您的硬體加速器是最新的 Google TPU,方法是檢查您的筆記本設定:執行階段 > 變更執行階段類型 > 硬體加速器 > TPU v2(它提供八個裝置供您使用)。
import jax
jax.devices()
[TpuDevice(id=0, process_index=0, coords=(0,0,0), core_on_chip=0),
TpuDevice(id=1, process_index=0, coords=(0,0,0), core_on_chip=1),
TpuDevice(id=2, process_index=0, coords=(1,0,0), core_on_chip=0),
TpuDevice(id=3, process_index=0, coords=(1,0,0), core_on_chip=1),
TpuDevice(id=4, process_index=0, coords=(0,1,0), core_on_chip=0),
TpuDevice(id=5, process_index=0, coords=(0,1,0), core_on_chip=1),
TpuDevice(id=6, process_index=0, coords=(1,1,0), core_on_chip=0),
TpuDevice(id=7, process_index=0, coords=(1,1,0), core_on_chip=1)]
主要概念:資料分片#
以下所有分散式計算方法的關鍵概念是資料分片,它描述了資料如何在可用裝置上佈局。
JAX 如何理解資料在裝置間的佈局?JAX 的資料類型 jax.Array
不可變陣列資料結構表示跨越單一或多個裝置的物理儲存陣列,並有助於使並行性成為 JAX 的核心功能。jax.Array
物件的設計考慮了分散式資料和計算。每個 jax.Array
都有一個關聯的 jax.sharding.Sharding
物件,它描述了每個全域裝置所需的全域資料分片。當您從頭開始建立 jax.Array
時,您也需要建立其 Sharding
。
在最簡單的情況下,陣列會在單一裝置上分片,如下所示
import jax.numpy as jnp
arr = jnp.arange(32.0).reshape(4, 8)
arr.devices()
{TpuDevice(id=0, process_index=0, coords=(0,0,0), core_on_chip=0)}
arr.sharding
SingleDeviceSharding(device=TpuDevice(id=0, process_index=0, coords=(0,0,0), core_on_chip=0))
為了更視覺化地表示儲存佈局,jax.debug
模組提供了一些輔助工具來視覺化陣列的分片。例如,jax.debug.visualize_array_sharding()
顯示陣列如何在單一裝置的記憶體中儲存
jax.debug.visualize_array_sharding(arr)
TPU 0
若要建立具有非平凡分片的陣列,您可以為陣列定義 jax.sharding
規格,並將其傳遞給 jax.device_put()
。
在此,定義 NamedSharding
,它指定具有具名軸的 N 維裝置網格,其中 jax.sharding.Mesh
允許精確的裝置放置
from jax.sharding import PartitionSpec as P
mesh = jax.make_mesh((2, 4), ('x', 'y'))
sharding = jax.sharding.NamedSharding(mesh, P('x', 'y'))
print(sharding)
NamedSharding(mesh=Mesh('x': 2, 'y': 4), spec=PartitionSpec('x', 'y'))
將此 Sharding
物件傳遞給 jax.device_put()
,您可以取得分片陣列
arr_sharded = jax.device_put(arr, sharding)
print(arr_sharded)
jax.debug.visualize_array_sharding(arr_sharded)
[[ 0. 1. 2. 3. 4. 5. 6. 7.]
[ 8. 9. 10. 11. 12. 13. 14. 15.]
[16. 17. 18. 19. 20. 21. 22. 23.]
[24. 25. 26. 27. 28. 29. 30. 31.]]
TPU 0 TPU 1 TPU 2 TPU 3 TPU 6 TPU 7 TPU 4 TPU 5
此處的裝置編號不是數字順序,因為網格反映了裝置的底層環面拓撲。
1. 透過 jit
自動並行化#
一旦您有了分片資料,執行並行計算的最簡單方法就是將資料傳遞給 jax.jit()
編譯的函數!在 JAX 中,您只需要指定您希望如何分割程式碼的輸入和輸出,編譯器將會找出如何:1) 分割所有內部內容;以及 2) 編譯裝置間通訊。
jit
背後的 XLA 編譯器包含用於最佳化跨多個裝置計算的啟發式方法。在最簡單的情況下,這些啟發式方法可歸結為計算跟隨資料。
為了示範自動並行化在 JAX 中的運作方式,以下範例使用 jax.jit()
修飾的階段輸出函數:它是一個簡單的元素級函數,其中每個分片的計算將在與該分片關聯的裝置上執行,並且輸出以相同方式分片
@jax.jit
def f_elementwise(x):
return 2 * jnp.sin(x) + 1
result = f_elementwise(arr_sharded)
print("shardings match:", result.sharding == arr_sharded.sharding)
shardings match: True
隨著計算變得更複雜,編譯器會決定如何最佳地傳播資料的分片。
在此,您沿著 x
的前導軸求和,並視覺化結果值如何在多個裝置之間儲存(使用 jax.debug.visualize_array_sharding()
)
@jax.jit
def f_contract(x):
return x.sum(axis=0)
result = f_contract(arr_sharded)
jax.debug.visualize_array_sharding(result)
print(result)
TPU 0,6 TPU 1,7 TPU 2,4 TPU 3,5
[48. 52. 56. 60. 64. 68. 72. 76.]
結果是部分複製的:也就是說,陣列的前兩個元素在裝置 0
和 6
上複製,第二個在 1
和 7
上複製,依此類推。
2. 具有約束的半自動分片#
如果您希望對特定計算中使用的分片有一些控制權,JAX 提供 with_sharding_constraint()
函數。您可以將 jax.lax.with_sharding_constraint()
(取代 jax.device_put()
) 與 jax.jit()
一起使用,以更精確地控制編譯器如何約束中介值和輸出的分佈方式。
例如,假設在上面的 f_contract
中,您希望輸出不是部分複製的,而是完全跨八個裝置分片的
@jax.jit
def f_contract_2(x):
out = x.sum(axis=0)
sharding = jax.sharding.NamedSharding(mesh, P('x'))
return jax.lax.with_sharding_constraint(out, sharding)
result = f_contract_2(arr_sharded)
jax.debug.visualize_array_sharding(result)
print(result)
TPU 0 TPU 1 TPU 2 TPU 3 TPU 6 TPU 7 TPU 4 TPU 5
[48. 52. 56. 60. 64. 68. 72. 76.]
這為您提供了一個具有您想要的特定輸出分片的函數。
3. 使用 shard_map
的手動並行化#
在上面探索的自動並行化方法中,您可以編寫一個函數,就好像您正在對完整資料集進行操作一樣,而 jit
會將該計算分割到多個裝置上。相反地,使用 jax.experimental.shard_map.shard_map()
,您可以編寫將處理單一資料分片的函數,而 shard_map
將建構完整的函數。
shard_map
的運作方式是將函數映射到特定的裝置網格上 (shard_map
映射到分片上)。在以下範例中
與之前一樣,
jax.sharding.Mesh
允許精確的裝置放置,並具有用於邏輯和物理軸名稱的軸名稱參數。in_specs
引數決定分片大小。out_specs
引數識別區塊如何組裝回一起。
注意: 如果您需要,jax.experimental.shard_map.shard_map()
程式碼可以在 jax.jit()
內部運作。
from jax.experimental.shard_map import shard_map
mesh = jax.make_mesh((8,), ('x',))
f_elementwise_sharded = shard_map(
f_elementwise,
mesh=mesh,
in_specs=P('x'),
out_specs=P('x'))
arr = jnp.arange(32)
f_elementwise_sharded(arr)
Array([ 1. , 2.682942 , 2.818595 , 1.28224 , -0.513605 ,
-0.9178486 , 0.44116896, 2.3139732 , 2.9787164 , 1.824237 ,
-0.08804226, -0.99998045, -0.07314599, 1.8403342 , 2.9812148 ,
2.3005757 , 0.42419332, -0.92279506, -0.50197446, 1.2997544 ,
2.8258905 , 2.6733112 , 0.98229736, -0.69244075, -0.81115675,
0.7352965 , 2.525117 , 2.912752 , 1.5418116 , -0.32726777,
-0.97606325, 0.19192469], dtype=float32)
您編寫的函數只會「看到」單一批次的資料,您可以透過列印裝置本機形狀來檢查
x = jnp.arange(32)
print(f"global shape: {x.shape=}")
def f(x):
print(f"device local shape: {x.shape=}")
return x * 2
y = shard_map(f, mesh=mesh, in_specs=P('x'), out_specs=P('x'))(x)
global shape: x.shape=(32,)
device local shape: x.shape=(4,)
因為您的每個函數只「看到」資料的裝置本機部分,這表示類似聚合的函數需要一些額外的思考。
例如,以下是 jax.numpy.sum()
的 shard_map
的樣子
def f(x):
return jnp.sum(x, keepdims=True)
shard_map(f, mesh=mesh, in_specs=P('x'), out_specs=P('x'))(x)
Array([ 6, 22, 38, 54, 70, 86, 102, 118], dtype=int32)
您的函數 f
在每個分片上單獨運作,而產生的總和反映了這一點。
如果您想要跨分片求和,您需要使用集合運算(例如 jax.lax.psum()
)明確要求它
def f(x):
sum_in_shard = x.sum()
return jax.lax.psum(sum_in_shard, 'x')
shard_map(f, mesh=mesh, in_specs=P('x'), out_specs=P())(x)
Array(496, dtype=int32)
由於輸出不再具有分片維度,因此設定 out_specs=P()
(回想一下,out_specs
引數識別區塊如何在 shard_map
中組裝回一起)。
三種方法的比較#
有了這些概念在腦海中,讓我們比較簡單神經網路層的三種方法。
首先定義您的標準函數,如下所示
@jax.jit
def layer(x, weights, bias):
return jax.nn.sigmoid(x @ weights + bias)
import numpy as np
rng = np.random.default_rng(0)
x = rng.normal(size=(32,))
weights = rng.normal(size=(32, 4))
bias = rng.normal(size=(4,))
layer(x, weights, bias)
Array([0.02138912, 0.893112 , 0.59892005, 0.97742504], dtype=float32)
您可以使用 jax.jit()
並傳遞適當分片的資料,以分散式方式自動執行此操作。
如果您以相同方式分片 x
和 weights
的前導軸,則矩陣乘法將自動並行發生
mesh = jax.make_mesh((8,), ('x',))
sharding = jax.sharding.NamedSharding(mesh, P('x'))
x_sharded = jax.device_put(x, sharding)
weights_sharded = jax.device_put(weights, sharding)
layer(x_sharded, weights_sharded, bias)
Array([0.02138912, 0.893112 , 0.59892005, 0.97742504], dtype=float32)
或者,您可以在函數中使用 jax.lax.with_sharding_constraint()
以自動分發未分片的輸入
@jax.jit
def layer_auto(x, weights, bias):
x = jax.lax.with_sharding_constraint(x, sharding)
weights = jax.lax.with_sharding_constraint(weights, sharding)
return layer(x, weights, bias)
layer_auto(x, weights, bias) # pass in unsharded inputs
Array([0.02138914, 0.89311206, 0.5989201 , 0.97742516], dtype=float32)
最後,您可以使用 shard_map
執行相同的操作,使用 jax.lax.psum()
來指示矩陣乘積所需的跨分片集合
from functools import partial
@jax.jit
@partial(shard_map, mesh=mesh,
in_specs=(P('x'), P('x', None), P(None)),
out_specs=P(None))
def layer_sharded(x, weights, bias):
return jax.nn.sigmoid(jax.lax.psum(x @ weights, 'x') + bias)
layer_sharded(x, weights, bias)
Array([0.02138914, 0.89311206, 0.5989201 , 0.97742516], dtype=float32)
下一步#
本教學簡要介紹了 JAX 中的分片和並行計算。
若要深入瞭解每種 SPMD 方法,請查看以下文件