Pallas 非同步操作#

背景 + 動機#

我們希望在 Pallas 中公開 API,以明確地在多個核心之間重疊計算和通訊。

XLA 非同步分解#

作為動機,請考慮以下 JAX 虛擬碼

def f(x):
  y = ppermute(x)
  z = x + 1
  return y, z

在這個函式中,我們可以同時執行 ppermutex + 1。這是 XLA 自動執行的最佳化,透過

  1. ppermute 分解為 ppermute_startppermute_done 操作,這些操作透過 future 連接。

  2. ppermute_startppermute_done 之間排程 x + 1

產生以下程式

def f(x):
  fut = ppermute_start(x)
  z = x + 1  # happens at the same time as ppermute
  y = ppermute_done(fut)
  return y, z

核心內的非同步操作#

現在想像一下,我們沒有使用 XLA 的 ppermute,而是使用我們自己的自訂 Pallas ppermute

def ppermute_kernel(x_ref, y_ref, send_sem, recv_sem):
  right_neighbor = ...
  descriptor = pltpu.make_async_remote_copy(x_ref, y_ref, send_sem, recv_sem, device_id=right_neighbor)
  descriptor.start()
  descriptor.wait_send()
  descriptor.wait_recv()

def ppermute(x):
  return pl.pallas_call(ppermute_kernel, out_shape=x, ...)(x)

目前,我們無法像 XLA 那樣將 ppermute 分解為 start/done 對,因此我們明確地將 x + 1 **融合**到核心中。

def add_one(x_ref, z_ref):
  z_ref[...] = x_ref[...] + 1

def ppermute_add_one_kernel(x_ref, y_ref, z_ref, send_sem, recv_sem):
  right_neighbor = ...
  descriptor = pltpu.make_async_remote_copy(x_ref, y_ref, send_sem, recv_sem, device_id=right_neighbor)
  descriptor.start()

  # Explicitly schedule inner kernel between start/wait
  pltpu.emit_pipeline(add_one)(x_ref, z_ref)

  descriptor.wait_send()
  descriptor.wait_recv()

def ppermute_and_add_one(x):
  return pl.pallas_call(ppermute_add_one_kernel, out_shape=(x, x), ...)(x)

目標是能夠為啟動 ppermute 和等待其完成撰寫個別的核心,以便我們可以在其間使用常規的 x + 1 (或我們想要的任何計算)。這使程式碼更具可讀性、可維護性且更不易出錯。

我們如何在 TPU 上實作分解的 Pallas 非同步操作?#

在 Pallas 中實作分解的非同步操作時,主要需要弄清楚的是在它們之間傳遞的 future 包含什麼。具體來說,它必須包含一些關於背景中正在發生的操作的重要狀態。

如果我們查看 Pallas 程式碼,我們可以看到我們需要一個「描述符」來啟動和等待遠端複製。我們可以將此描述符從 Pallas 核心中引出,然後將其傳遞到另一個核心中嗎?嗯,有點。底層 TPU 硬體透過一對信號量追蹤非同步操作進度:send_sem 使我們能夠等待裝置何時完成將資料傳送到其鄰居,而 recv_sem 追蹤從其鄰居傳送到裝置的資料傳輸。如果我們想像撰寫一個啟動核心和一個完成核心,那麼我們需要從啟動傳遞到完成的只是信號量以及有關在這些信號量上等待多久的一些資訊。

我們可以透過擴充 Pallas 以支援從核心傳回信號量來做到這一點。

def ppermute_start_kernel(
    in_ref, send_sem, recv_sem, out_ref, *, axis_name,
):
  axis_size = jax.lax.psum(1, axis_name)
  left_neighbor = jax.lax.rem(
      jax.lax.axis_index(axis_name) - 1 + axis_size, axis_size
  )
  right_neighbor = jax.lax.rem(jax.lax.axis_index(axis_name) + 1, axis_size)
  barrier_sem = pltpu.get_barrier_semaphore()
  pltpu.semaphore_signal(barrier_sem, device_id=left_neighbor)
  pltpu.semaphore_wait(barrier_sem, 1)
  pltpu.make_async_remote_copy(
      in_ref, out_ref, send_sem, recv_sem, device_id=right_neighbor
  ).start()

def ppermute_start(x, *, axis_name) -> tuple[Semaphore, Semaphore, Array]:
  send_sem, recv_sem, out = pl.pallas_call(
      functools.partial(ppermute_start_kernel, axis_name=axis_name),
      out_shape=(
          pltpu.SemaphoreType.DMA(()),
          pltpu.SemaphoreType.DMA(()),
          jax.ShapeDtypeStruct(
              x.shape,
              dtype=x.dtype,
          ),
      ),
      in_specs=[
          pl.BlockSpec(memory_space=pltpu.ANY),
      ],
      out_specs=(
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.ANY),
      ),
  )(x)
  return send_sem, recv_sem, out

請注意,這裡發生了一些微妙的事情。Pallas 正在告訴 XLA 它希望某些輸出是信號量 (又名同步標 flags),並且 XLA 會將它們視為「保留」(例如,當它們在 XLA 程式中處於活動狀態時,這些同步標 flags 無法由其他核心配置)。它們的行為類似於屏障信號量,屏障信號量是由 XLA 管理的保留信號量。

另一個需要注意的是,我們從啟動核心傳回輸出緩衝區 out,*同時它正在被主動複製到其中*。

現在我們撰寫執行封鎖操作的 done 核心。我們將 out 傳遞到核心中,以計算封鎖信號量所需的形狀。

def ppermute_done_kernel(ref, send_sem, recv_sem, _):
  pltpu.make_async_copy(ref, ref, send_sem).wait()
  pltpu.make_async_copy(ref, ref, recv_sem).wait()

def ppermute_done(send_sem, recv_sem, out) ->Array:
  out = pl.pallas_call(
      ppermute_done_kernel,
      out_shape=(
          jax.ShapeDtypeStruct(
              out.shape,
              dtype=out.dtype,
          ),
      ),
      in_specs=[
          pl.BlockSpec(memory_space=pltpu.ANY),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
      ],
      out_specs=pl.BlockSpec(memory_space=pltpu.ANY),
      input_output_aliases={0:0}
  )(out, send_sem, recv_sem)
  return out

注意:我們在此處對輸出緩衝區進行 i/o 別名,以保證消費者在 ppermute_done 的下游。

我們現在可以實作分解的集體置換。

def f(x):
  fut = ppermute_start(x)
  z = x + 1  # happens at the same time as ppermute
  y = ppermute_done(fut)
  return y, z

或者我們可以嗎?

為什麼這樣行不通#

這仍然存在三個問題,每個問題都在 Pallas 之外或多或少存在。以下是它們的高階概述。

  1. 排程 - 僅僅因為我們撰寫了 ppermute_start,然後是 x + 1,然後是 ppermute_done,並不能保證它們會按照該順序發生。XLA 負責排程,因此當我們撰寫 JAX 程式時,我們正在設定 XLA 將尊重的資料依賴性,但 XLA 不會尊重 JAX 中撰寫的特定操作順序。

  2. 生命週期 - XLA 假設一旦值超出依賴圖中的範圍,就可以釋放其記憶體以供其他值使用。如果我們有一個非同步複製 x -> y 的操作,我們需要確保 x 在複製完成之前一直處於活動狀態,否則我們將從垃圾記憶體中複製。

  3. 防禦性副本 - XLA 保留建立值副本的權利。我們需要確保我們不會引入不必要的副本,以 a) 避免不必要的執行階段額外負荷,以及 b) 確保正確性。

我們將逐一討論這些問題並提出修正方法。

排程#

我們如何在 JAX 中明確強制操作以特定順序發生?請注意,這不是 Pallas 特有的問題,如果我們使用替代方法實作了非同步操作,我們仍然會遇到這個問題。

一種方法是在 XLA 程式中引入最佳化屏障。最佳化屏障將阻止 XLA 在其周圍移動操作。

這是我們的原始程式碼

def f(x):
  fut = ppermute_start(x)
  z = x + 1
  y = ppermute_done(fut)
  return y, z

XLA 可以選擇在以下三個位置中的任何一個執行 x + 1

def f(x):
  z = x + 1
  fut = ppermute_start(x)
  y = ppermute_done(fut)
  return y, z

# OR

def f(x):
  fut = ppermute_start(x)
  z = x + 1
  y = ppermute_done(fut)
  return y, z

# OR

def f(x):
  fut = ppermute_start(x)
  y = ppermute_done(fut)
  z = x + 1
  return y, z

為了強制 x + 1 發生在 ppermute 操作之間,我們可以使用 optimization_barrier,它在語義上是恆等函式 (即 lambda x: x),但在值之間引入了明確的資料依賴性。具體來說,如果我們使 x (在 x + 1 中使用) 依賴於 ppermute_start 傳回的 fut,則它必須在 ppermute_start 之後發生。

我們還引入了一個依賴性,強制輸出值 y 依賴於 z

def f(x):
  fut = ppermute_start(x)
  x, fut = optimization_barrier((x, fut))  # x now depends on fut
  z = x + 1
  z, fut = optimization_barrier((z, fut)) # fut now depends on z
  y = ppermute_done(fut)
  return y, z

optimization_barrier 對於我們明確寫出排程來說是一個足夠好的工具。

生命週期#

讓我們再次查看我們的原始程式碼,並假設操作以正確的順序發生。

def f(x):
  fut = ppermute_start(x)
  z = x + 1
  y = ppermute_done(fut)
  return y, z

讓我們看看程式中 XLA 認為可以釋放 x 緩衝區的點。它將是 x 不再使用的點之後,特別是在 z = x + 1 之後。

def f(x):
  fut = ppermute_start(x)
  z = x + 1
  # XLA can free x here!
  y = ppermute_done(fut)
  return y, z

如果 XLA 在 z = x + 1 完成後釋放 x,我們會遇到一個非常糟糕的問題。ppermute 可能仍在 z = x + 1 之後主動將 x 複製到鄰居,這意味著如果 x 被釋放,ppermute 將從垃圾記憶體中讀取!

我們如何將 x 的生命週期延長到 ppermute_done?嗯,我們可以引入資料依賴性!我們需要稍微修改我們的核心才能實現這一點。

首先,我們重寫 ppermute_start 以傳回 x,透過核心對其進行別名。

def ppermute_start_kernel(
    in_ref, send_sem, recv_sem, out_ref, _, *, axis_name,
):
  axis_size = jax.lax.psum(1, axis_name)
  left_neighbor = jax.lax.rem(
      jax.lax.axis_index(axis_name) - 1 + axis_size, axis_size
  )
  right_neighbor = jax.lax.rem(jax.lax.axis_index(axis_name) + 1, axis_size)
  barrier_sem = pltpu.get_barrier_semaphore()
  pltpu.semaphore_signal(barrier_sem, device_id=left_neighbor)
  pltpu.semaphore_wait(barrier_sem, 1)
  pltpu.make_async_remote_copy(
      in_ref, out_ref, send_sem, recv_sem, device_id=right_neighbor
  ).start()

def ppermute_start(x, *, axis_name) -> tuple[Semaphore, Semaphore, Array, Array]:
  send_sem, recv_sem, x, out = pl.pallas_call(
      functools.partial(ppermute_start_kernel, axis_name=axis_name),
      out_shape=(
          pltpu.SemaphoreType.DMA(()),
          pltpu.SemaphoreType.DMA(()),
          jax.ShapeDtypeStruct(
              x.shape,
              dtype=x.dtype,
          ),
	   jax.ShapeDtypeStruct(
              x.shape,
              dtype=x.dtype,
          ),
      ),
      in_specs=[
          pl.BlockSpec(memory_space=pltpu.ANY),
      ],
      out_specs=(
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.ANY),
          pl.BlockSpec(memory_space=pltpu.ANY),
      ),
      input_output_aliases={0:2}
  )(x)
  return send_sem, recv_sem, x, out

然後我們讓 ppermute_done 接收 x 並且不對其執行任何操作。

def ppermute_done_kernel(_, ref, send_sem, recv_sem, _):
  pltpu.make_async_copy(ref, ref, send_sem).wait()
  pltpu.make_async_copy(ref, ref, recv_sem).wait()

def ppermute_done(send_sem, recv_sem, x, out) ->Array:
  out = pl.pallas_call(
      ppermute_done_kernel,
      out_shape=(
          jax.ShapeDtypeStruct(
              out.shape,
              dtype=out.dtype,
          ),
      ),
      in_specs=[
          pl.BlockSpec(memory_space=pltpu.ANY),
          pl.BlockSpec(memory_space=pltpu.ANY),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
          pl.BlockSpec(memory_space=pltpu.SEMAPHORE),
      ],
      out_specs=pl.BlockSpec(memory_space=pltpu.ANY),
      input_output_aliases={1:0}
  )(x, out, send_sem, recv_sem)
  return out

現在當我們撰寫

def f(x):
  *sems, x ,out = ppermute_start(x)
  z = x + 1
  y = ppermute_done(*sems, x, out)
  return y, z

XLA 無法再釋放 x,因為它是 ppermute_done 的輸入!這表示 x 的生命週期與 ppermute 相關聯,並且此程式碼現在是正確的。

防禦性副本#

XLA 在其緩衝區指派傳遞中,分析哪些緩衝區彼此別名,並在別名其輸入之一的操作不是該輸入的最終消費者時插入副本。

背景#

這是一個簡單的範例。假設我們有一個操作 add_one_inplace,它接收一個陣列並加一,但承諾就地執行。

以下程式碼將是合法的。

def f():
  x = jnp.arange(...)
  y = add_one_inplace(x)  return y

但是,如果 x 也有一個單獨的消費者,則程式可能無法正確執行。

def f():
  x = jnp.arange(...)
  y = add_one_inplace(x)
  return y, x * 2 # another x consumer!

這是因為 x * 2 對原始 x 進行操作,但 add_one_inplace 會覆蓋 x 中的值。x * 2 需要確保讀取 x 的原始值,而不是我們將其遞增 1 後的值。XLA 注意到這一點並插入一個 `copy` 操作 (在語義上是恆等式,但輸入和輸出緩衝區將會不同)。

def f(x):
  x2 = copy(x)
  y = add_one_inplace(x2)
  return y, x * 2

XLA 中的此傳遞透過強制它們有效地與 `copy` 操作異地執行,來確保在存在執行就地更新的操作時的正確性。

具有下游操作的副本#

讓我們重新審視我們在 ppermute 期間加 1 的範例。

def f(x):
  fut = ppermute_start(x)
  z = x + 1
  y = ppermute_done(fut)
  return y, z

如果我們將 future 解壓縮為其組件,我們將看到別名模式

def f(x):
  *sems, x2, y = ppermute_start(x)
  z = x + 1
  y = ppermute_done((*sems, x2, y))
  return y, z

我們知道 xppermute_start 後保持不變 (也就是說,xx2 相同),但 XLA 並不知道。實際上,它看起來像我們給 XLA 的 add_one_inplace 範例,它保守地假設 ppermute_start 變更了 x,而 x2 是新的別名結果。因此,當我們執行 z = x + 1 時,我們會遇到原始緩衝區的消費者。因此,XLA 引入了一個副本!

def f(x):
  x2 = copy(x)
  *sems, x2, y = ppermute_start(x2)
  z = x + 1
  y = ppermute_done((*sems, x2, y))
  return y, z

此副本是不必要的,因為我們知道 x2x 相比沒有變更。為了移除此副本,我們需要某種機制來通知 XLA 我們只是轉發一個值。但是,在沒有這種機制的情況下,我們可以稍微重寫我們的程式,以明確使用 x2 而不是 x

def f(x):
  *sems, x2, y = ppermute_start(x)
  z = x2 + 1
  y = ppermute_done((*sems, x2, y))
  return y, z

現在,XLA 看不到 x 的單獨消費者,因此不再引入副本。但是,這有一個主要的缺點,因為它迫使我們解壓縮來自 ppermute_start 的 future。它將生命週期問題與複製問題聯繫起來。

迴圈別名#

讓我們考慮一個稍微更進階的範例。讓我們實作一個使用 while_loopppermute 在環中傳送值的函式。

def f(x):
  def body(i, x):
    fut = ppermute_start(x)
    y = ppermute_done(fut)
    return y
  return fori_loop(0, 8, body, x)

fori_loop 的一個實作細節是輸入和輸出緩衝區會自動彼此別名。請注意,我們正在 ppermute_startppermute_done 操作中設定一些額外的別名。讓我們透過為程式中的每個值著色來執行我們自己的「緩衝區指派」,以確定我們需要多少個唯一緩衝區。

首先,我們將解壓縮具有別名 xout 緩衝區的 fut 元組。

def f(x):
  def body(i, x):
    *sems, x, y = ppermute_start(x)
    y = ppermute_done(*sems, x, y)
    return y
  return fori_loop(0, 8, body, x)

現在讓我們根據指派給它們的唯一緩衝區為每個值著色。我們有來自 fori_loop 的輸入/輸出別名、來自 ppermute_startx 別名和來自 ppermute_doney 別名。

def f(x):
  def body(i, x):
    *sems, x, y = ppermute_start(x)
    y = ppermute_done((*sems, x, y))
    return y
  return fori_loop(0, 8, body, x)

如果您執行別名分析,您會發現所有緩衝區都已著色相同!直覺上,這是成問題的,因為如果我們正在執行 ppermute 的迴圈,我們無法寫入我們正在傳送到的同一個緩衝區。我們通常需要一個額外的 (即「雙重」) 緩衝區來接收,然後通常我們會在下一次迭代中切換傳送/接收緩衝區。XLA 在實務中會做的是,它會觀察緩衝區重新使用並防禦性地插入副本。

def f(x):
  def body(i, x):
    x = copy(x)
    *sems, x, y = ppermute_start(x)
    y = ppermute_done((*sems, x, y))
    return y
  return fori_loop(0, 8, body, x)

此副本表示 xy 不再彼此別名,並且程式將是正確的。但是,我們需要此副本嗎?我們如何引入雙重緩衝區以避免每次迭代都進行昂貴的副本?答案是展開!

我們將手動展開我們的程式碼。

def f(x):
  def body(i, x):
    *sems, x, x2 = ppermute_start(x)
    x2 = ppermute_done((*sems, x, x2))
    
    *sems, x2, y = ppermute_start(x2)
    y = ppermute_done((*sems, x2, y))
    return y
  return fori_loop(0, 4, body, x)

現在,如果我們要執行相同的別名分析,我們會發現緩衝區不再彼此別名,並且我們不需要插入防禦性副本即可正確。

因此,移除這些副本的簡單解決方案是使用 fori_loop 和 `unroll >= 2`。

def f(x):
  def body(i, x):
    fut = ppermute_start(x)
    y = ppermute_done(fut)
    return y
  return fori_loop(0, 8, body, x, unroll=2)

這足以實作此迴圈而無需額外副本!

跨迴圈邊界傳遞 future#

現在讓我們看一個更進階的範例。我們將實作與之前相同的程式,但會錯開迴圈,我們會在迴圈之前的序言中開始 ppermute,並在迴圈開始時等待 ppermute

def f(x):
  fut = ppermute_start(x)
  def body(i, fut):
    x = ppermute_done(fut)
    fut = ppermute_start(x)
    return fut
  fut = fori_loop(0, 7, body, fut)
  return ppermute_done(fut)

在此範例中,我們傳遞的是 future 值,而不是將值 x 從一個迴圈傳遞到另一個迴圈。

讓我們再次解壓縮 future 以查看發生了什麼事。

def f(x):
  fut = ppermute_start(x)
  def body(i, fut):
    *sems, x, out = fut
    x = ppermute_done((*sems, x, out))
    (*sems, x, out) = ppermute_start(x)
    return (*sems, x, out)
  (*sems, x, out) = fori_loop(0, 7, body, x)
  return ppermute_done((*sems, x, out))

因此,我們正在明確地將信號量、輸入緩衝區和目標輸出緩衝區作為迴圈進位進行執行緒化。如果我們現在執行別名分析會發生什麼事?嗯,我們將遇到與上一節相同的別名問題,其中 xout 將彼此別名。XLA 將引入一個副本。

def f(x):
  fut = ppermute_start(x)
  def body(i, fut):
    *sems, x, out = fut
    out = copy(out)
    x = ppermute_done((*sems, x, out))
    (*sems, x, out) = ppermute_start(x)
    return (*sems, x, out)
  (*sems, x, out) = fori_loop(0, 7, body, x)
  return ppermute_done((*sems, x, out))

在這種情況下,我們在 out 上插入了一個副本。但是,這是一個非常糟糕的情況,因為 out 正在被主動複製到其中!即使我們在 x 上插入一個副本,我們也會遇到問題,因為這樣 x 的生命週期將不會延長到 ppermute_done。這非常非常糟糕!我們不僅會得到副本,而且還會得到不正確的結果!

正如我們之前觀察到的,解決方案是透過展開來避免別名所有緩衝區,從而避免副本。所以,如果我們這樣做

def f(x):
  fut = ppermute_start(x)
  def body(i, fut):
    x = ppermute_done(fut)
    fut = ppermute_start(x)
    return fut
  fut = fori_loop(0, 7, body, x, unroll=2)
  return ppermute_done(fut)

我們的程式現在應該是正確的。

整合在一起#

因此,我們提出了一些經驗法則

  1. 如果我們的操作依賴於 ppermute 的輸入值,請解壓縮 future 以使用別名值而不是原始值。

  2. 在迴圈主體中執行 ppermute 時,請使用 `unroll >= 2`。

讓我們將所有內容組合到一個函式中,該函式在迴圈中執行 ppermute 並累積結果。

def f(x):
  out = jnp.zeros_like(x)
  fut = (*sems, x, out) = ppermute_start(x)
  out = out + x
  def body(i, carry):
    out, fut = carry
    x = ppermute_done(fut)
    fut = (*sems, x, out) = ppermute_start(x)
    out = out + x
    return out, fut
  out, fut = fori_loop(0, 7, body, (out, fut), unroll=2)
  return out, ppermute_done(fut)

請注意,在此範例中,我們不需要 optimization_barrier,因為迴圈邊界充當排程屏障,將 startdone 分開。

就是這樣,我們完成了!這將是 Pallas 中用於執行非同步操作的官方 API。謝謝大家!任務完成!

或者真的是這樣嗎?

狀態的反擊#

雖然看起來我們透過使用一些聰明的技巧來解決了副本和不正確的問題,但我們仍然處於尷尬的境地。此 API 功能強大,但有許多陷阱和注意事項。可能還有更多邊緣情況需要我們處理,甚至需要深入了解 XLA 才能預測或理解。我們應該發布這樣的 API 嗎?還是有其他替代方案?

嗯,答案可能一直就在我們眼前。

讓我們再完整地執行一次這個練習,除了,這次我們來寫具狀態的版本。這表示我們每個自訂的非同步操作現在都將對 Ref 而非值進行操作。

def ppermute_start_stateful(x_ref, y_ref) -> tuple[Semaphore, Semaphore]:
  ...

def ppermute_done_stateful(send_sem, recv_sem, x_ref, y_ref) -> None:
  ...

假設我們可以在 Pallas 中實作這些,並看看我們的新程式會是什麼樣子。讓我們從一個基本的集體置換 (collective permute) 開始

def f(x):
  x_ref = make_ref(x)
  y_ref = make_ref(zeros_like(x))
  fut = ppermute_start_stateful(x_ref, y_ref)
  ppermute_done_stateful(*fut, x_ref, y_ref)
  return y_ref[...]

它比我們原始的基於值的版本稍微冗長一些,但它有一些關鍵的差異。第一個是我們建立一個「空的」Ref 來接收 ppermute 的結果,這與基於值的版本不同,後者會為我們建立一個值。一個很棒的地方是 x_ref 的生命週期在這裡很清楚:它會持續到 ppermute_done_stateful。我們不需要像以前那樣「偷偷地」將 x 值塞進操作中。

另一個差異變得更明顯,當我們嘗試在 start/done 之間添加一個操作時。

def f(x):
  x_ref = make_ref(x)
  y_ref = make_ref(zeros_like(x))
  fut = ppermute_start_stateful(x_ref, y_ref)
  x_ref[...] += 1
  ppermute_done_stateful(*fut, x_ref, y_ref)
  return y_ref[...]

之前,我們遇到了排程不明確的問題,其中 XLA 可能會相對於 ppermute 重新排序 add 操作。使用具狀態的語義,我們實際上加入了排序約束!x_ref[...] += 1 會修改 x_ref,因此它不能相對於 ppermute_done_stateful 移動。JAX 可以在降低到 HLO 的過程中注入這些排程約束。

最後一個關鍵差異很明顯,當我們嘗試迴圈範例時。

def f(x):
  x_ref = make_ref(x)
  y_ref = make_ref(zeros_like(x))
  def body(i, _):
    fut = ppermute_start_stateful(x_ref, y_ref)
    ppermute_done_stateful(*fut, x_ref, y_ref)
    # Now switch to y_ref -> x_ref
    fut = ppermute_start_stateful(y_ref, x_ref)
    ppermute_done_stateful(*fut, y_ref, x_ref)
  fori_loop(0, 8 // 2, body, None)
  return x_ref[...]

因為我們需要一個單獨的緩衝區來接收 ppermute 的結果,我們被迫以展開迴圈的方式編寫程式碼!沒有辦法在 XLA 中編寫需要複製的版本,因為那會涉及到一個從 Ref 發送到自身的 ppermute,這實際上沒有意義。

為了在不手動展開迴圈的情況下處理這個問題,我們會建立一個前緣維度為 2 的暫存緩衝區,它在迭代之間充當發送/接收目標,並在每次迭代時切換。這與我們在 Pallas 核心程式內部使用的模式相同,當編寫手動重疊的核心程式時。

這裡的體認是,具狀態的特性迫使我們更早地處理許多問題,這些問題在使用基於值的語義時會浮現。我們從一開始就避免了這些問題!

  1. 排程 - 以 Ref 作為輸入的具狀態操作會強制程式的執行順序。請注意,這將會對同一個 Ref 上的操作彼此之間進行排程。我們可能還需要一個 opt_barrier_stateful 來強制執行更多排序約束。

  2. 生命週期 - Ref 的生命週期可以透過 run_state 來劃定範圍,或者可以作為具狀態操作的輸入。

  3. 防禦性複製 - 使用 Ref 迫使我們「手動」處理緩衝區分配,而降低過程可以確保別名機制正常運作,以避免任何複製。

另一個重要的基本限制是,我們最終會階段性地輸出一個 HLO 程式,其中存活的緩衝區和信號量被表示為陣列值類型。XLA 不保證這些中間值的緩衝區生命週期或它們所在的記憶體空間。因此,即使 Pallas 核心程式正在積極地將資料複製到陣列值中,XLA 仍有可能複製這些陣列值。 這在 HLO 中很容易驗證,但這是使用自訂呼叫來表示 HLO 中的非同步操作的一個缺點。

結論#

我們已經討論了一些關於 Pallas 和 JAX 中非同步操作的棘手挑戰。Ref 似乎是一種很有希望的方式來表示這些操作,它可以規避一些在使用基於值的語義時出現的問題。然而,一個缺點是它將具狀態的 JAX 放在首要位置,這是我們在 Pallas 之外尚未做過的事情。值得思考的是,我們應該教育使用者關於具狀態操作的知識,還是提供一個更危險的 API。我們也不知道我們想做的每件事是否都可以透過 Ref 來表達。我們也應該集思廣益,尋找狀態的替代方案,以擴展設計空間。例如,如果 XLA 提供了一個尊重生命週期的第一級 futures API,並且它可以自動執行諸如使用 futures 的雙緩衝迴圈之類的操作?這可能是一個可行的替代方案,但權衡之處在於將更多控制權交給編譯器,而不是由使用者明確控制。