jax.make_array_from_process_local_data#

jax.make_array_from_process_local_data(sharding, local_data, global_shape=None)[源代码]#

使用进程中可用的数据创建分布式张量。

此函数是 make_array_from_callback 的一个常见特例。它假设数据在进程中可用,并处理索引操作。

最常见的情况是分片跨批次维度进行,每个主机只加载其对应的子批次。此函数也支持更一般的情况,例如混合多主机和多轴复制和分片,但您需要正确计算进程本地数据的大小和内容,以满足分片约束。

特别是,如果任何两个主机是副本,则 host_local_data 也应相同。

global_shape 是可选的。如果未提供,则会从 local_data 和分片中推断出来,前提是每个主机仅代表其自己的数据以进行均匀分片。如果分片不均匀(请参见下面的注释),则会引发异常。

显式设置 global_shape 可以进行更精细的控制,并适用于不均匀的分片。global_shape 的每个维度都必须与 host_local_data 匹配,或者与分片的推断全局形状匹配(在这种情况下,它等效于将其设置为 None,但更明确)。

例如,如果维度 i 完全分片,则此大小将为 per_device_shape[i] * jax.local_device_count()。每个设备将映射到 local_data 数组的本地切片中。例如,如果给定的进程地址切片为 (8, 12) 和 (24, 28),则这些切片将映射到 local_data 的 (0, 4) 和 (4, 8)。

对于 global_shapes 与 local_shape 匹配的每个维度,每个设备将在 local_data 中查找切片。例如,如果 global_shape == local_data.shape,则假定本地数据是要分片到设备的实际目标数组。

如果 global_shape 与 local_data.shape 相同,则所有主机的数据必须相同。

示例

>>> from jax.sharding import PartitionSpec as P
>>> mesh_rows = 2
>>> mesh_cols =  jax.device_count() // 2
...
>>> mesh = jax.sharding.Mesh(np.array(jax.devices()).reshape(mesh_rows, mesh_cols), ('x', 'y'))
>>> sharding = jax.sharding.NamedSharding(mesh, P(('x', 'y'),))
>>> rows_per_device = 2
>>> feature_length = 32
>>> per_device_shape = (rows_per_device, feature_length)
>>> per_host_shape = (rows_per_device * len(mesh.local_devices), feature_length)
>>> per_host_generator = lambda : np.arange(np.prod(per_host_shape)).reshape(per_host_shape)
>>> per_host_data = per_host_generator()  # replace with your own per-host data pipeline that outputs numpy arrays
>>> global_shape = (rows_per_device * len(sharding.device_set), ) + per_device_shape[1:]
>>> output_global_array = jax.make_array_from_process_local_data(sharding, per_host_data, global_shape)
...
>>> assert output_global_array.addressable_data(0).shape == per_device_shape
>>> assert output_global_array.shape == global_shape

注意:虽然大多数分片是均匀的,但可以设计一种特殊的分片网格,其中每个进程的设备在某些维度上以非网格状模式排列,或者索引以非平凡的方式重叠。这种分片在这些维度上被称为“不均匀”。在这种情况下,沿这些方向的全局形状必须与局部形状匹配,因为没有有意义的方法以非重叠方式表示所有需要的每个进程的数据。例如,对于全局形状 4x4,如果分片如下所示

0123 2103 4675 4567

有 4 个进程,分别包含设备 (0,1)、(2, 3)、(4, 5)、(6, 7)。那么每个主机的数据如下所示

xx.. ..xx …. …. .xx. x..x …. …. …. …. x..x .xx. …. …. xx.. ..xx

分片在行上是均匀的(每个主机需要第 1-2 行或第 3-4 行),在列上是不均匀的(主机需要重叠但不匹配的列集)。因此,对于所有主机,局部数据必须具有 2x4 或 4x4 的形状,即使每个主机都可以潜在地容纳在 2x2 的形状中。在这种情况下,用户必须显式提供 global_shape,并且对于 local_shape=(2, 4),潜在有效的全局形状为 (2, 4) 和 (4, 4)。

另一方面,对于分片

0213 x.x. .x.x. …. …. 0213 x.x. .x.x. …. …. 4657 …. …. .x.x x.x. 4657 …. …. .x.x x.x.

对于 local_shape=(2, 2),此函数可以接受 2x2、2x4、4x2 和 4x4 全局形状的选择。在这种情况下,将 global_shape 设置为 None,等效于将其设置为 (4, 4)。

参数:
  • sharding (Sharding) – 全局数组的分片。

  • local_data (np.ndarray) – 主机上要放置在本地设备上的数据。每个维度应与 global_shape 匹配,或与 num_addressable_indices(dim) 匹配。

  • global_shape (Shape | None | None) – 全局数组的目标形状。如果为 None,将从 local_data 和分片中推断。

返回:

将具有 sharding=sharding 和形状 global_shape 的张量。

返回类型:

ArrayImpl