o
    i[F                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlZd dlmZ d dl	m
Z
 d dlmZmZmZ g dZe eZ	 dZG d	d
 d
ZG dd deZeedejZd ZG dd dZG dd dZde
dedee
 fddZdejdededeej fddZ dd Z!		d'de"edf de#e$ef dB d ed!e"edf dB d"e#e$ef dB de"ee" ee# f fd#d$Z%d ee fd%d&Z&dS )(    N)Sequence)Anymap_aggregate)	BlockMask)tree_flattentree_maptree_unflatten)TensorChunkSpecsplit_args_kwargs_into_chunksmerge_chunksFc                   @   s   e Zd ZdZdd ZdS )_CustomReducera$  
    Custom reducer class that can be used to specify a custom operation that
    reduces losses of multiple microbatches into one value.

    Example:
    >>> # xdoctest: +SKIP
    >>> sum_reducer = _CustomReducer(
    >>>     torch.tensor(0.0),
    >>>     lambda a, b: a + b
    >>> )
    c                 C   s   || _ || _d S N)
init_value	reduce_fn)selfr   r    r   i/sda-disk/www/egybert/egybert_env/lib/python3.10/site-packages/torch/distributed/pipelining/microbatch.py__init__+   s   
z_CustomReducer.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r      s    r   c                   @      e Zd ZdS )_LossReducerNr   r   r   r   r   r   r   r   0       r   g        c                   @   sf   e Zd ZU dZdd Zeed< dd Zdd Ze	d	e
ed
f fddZe	d	eeef fddZdS )r
   z2
    Class used to specify chunking of inputs
    c                 C   s
   || _ d S r   	split_dim)r   r   r   r   r   r   @   s   
zTensorChunkSpec.__init__r   c                 C   s    | j j d| j j d| j dS )N.())	__class__r   r   r   r   r   r   r   __repr__E   s   zTensorChunkSpec.__repr__c                 C   s   d| j  dS )NzTensorChunkSpec(r!   r   r#   r   r   r   __str__J   s   zTensorChunkSpec.__str__
chunk_dims.c                 C      t | dd }|S )a  
        A helper for creating a tuple of `TensorChunkSpec` from a tuple of chunk
        dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # There are three positional arguments to the model, and
            >>> # we are chunking them along dimension 0, 0 and 1, respectively
            >>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1))
        c                 S      t | S r   r
   dimr   r   r   <lambda>\       z,TensorChunkSpec.from_tuple.<locals>.<lambda>r   )r&   args_chunk_specr   r   r   
from_tupleM   s
   zTensorChunkSpec.from_tuplec                 C   r'   )a\  
        A helper for creating a dictionary of `TensorChunkSpec` from a
        dictionary of chunk dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # Chunk dimension 0 for the "id" argument, 1 for the "mask" argument
            >>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1})
        c                 S   r(   r   r)   r*   r   r   r   r,   n   r-   z+TensorChunkSpec.from_dict.<locals>.<lambda>r   )r&   kwargs_chunk_specr   r   r   	from_dict`   s
   zTensorChunkSpec.from_dictN)r   r   r   r   r   int__annotations__r$   r%   staticmethodtupler/   dictstrr1   r   r   r   r   r
   ;   s   
 

r
   c                   @   r   )
_ReplicateNr   r   r   r   r   r8   t   r   r8   
block_mask
num_chunksreturnc                    s   j ddkr g| S  j d|ksJ dd}t j ||}t j||} jdur8t j||ndg| } jdurJt j||ndg| }g }d}t|D ],}	 fdd}
|t	j
||	 ||	 ||	 ||	  j|
| jd |||	 d7 }qW|S )a	  Given a block mask, split the block mask along the batch dimension (dim0).

    Args:
        block_mask: Block mask to split
        num_chunks: Number of chunks to split the block mask into

    Returns:
        chunk_block_masks: List of chunked block masks
    r      z;Block mask has fewer batch size than the number of chunks. Nc                    s    fdd}|S )Nc                    s    t | } | | |||S r   )torch	full_likemask_mod)bhq_idxkv_idxb_offset)r9   idxr   r   batch_offset_mask_mod   s   zI_split_block_mask.<locals>.create_mask_mod.<locals>.batch_offset_mask_modr   )rE   rF   r9   )rE   r   create_mask_mod   s   z*_split_block_mask.<locals>.create_mask_mod)kv_num_blocks
kv_indicesfull_kv_num_blocksfull_kv_indices
BLOCK_SIZEr?   seq_lengths)rI   sizer=   tensor_splitrJ   rK   rL   rangeappendr   from_kv_blocksrM   rN   )r9   r:   	batch_dimkv_num_blocks_chunkskv_indices_chunksfull_kv_num_blocks_chunksfull_kv_indices_chunkschunk_block_masksbatch_offset	chunk_idxrH   r   rG   r   _split_block_maskx   sF   


r\   tensorspecc           
      C   s   |  |j|ksJ d|  |j dt| ||j}ts |S g }d}|D ]2}t| }|| |j }tdddg|j }	t|||	|j< |||	< || || |j7 }q&|S )zGiven a tensor, and a chunking spec, split the tensor.
    Args:

        tensor: Tensor to split
        spec: Chunking spec
        num_chunks: Number of chunks to split the tensor into

    Returns:
        chunk_tensors: List of chunked tensors
    zTensor size z is smaller than num_chunksr   N)	rO   r   r=   rP   _debug_mask_minibatches
zeros_likeslicendimrR   )
r]   r^   r:   chunk_tensorsexpanded_chunkssplit_dim_idxchunk_tensornew_val	upper_idxslice_indicesr   r   r   _split_tensor   s"   

rj   c                    s.  | sdd t |D S t| t|ks%J dt|   dt|  |dus+J t| dd d\} t|d	d d\}}g }t||d
dD ]a\}}|tu sUt|tr[|| qHt|t	j
rrt|tshJ |||j qHt|trt|ts~J |jdksJ d|jddkr|| qH||jd qHtd| d| dtg ||R  }	dd t |	D }
t||d
dD ]J\}}g }|tu st|tr|g|	 }n#t|t	j
rt|||	}nt|trt||	}ntd| d| dt|
|d
dD ]
\}}|| qqÇ fdd|
D S )aW  
    Given a dictionary of args, and a dictionary of chunking specs, shard the
    args according to the chunking specs.

    Args:
        args_dict: Dictionary of args
        args_chunk_spec: Dictionary of chunking specs
        num_chunks: Number of chunks to shard the args into

    Returns:
        args_split: List of sharded args
    c                 S   s   g | ]}i qS r   r   .0_r   r   r   
<listcomp>       z'_shard_dict_of_args.<locals>.<listcomp>zargs_dict.keys() = z args_chunk_spec.keys() = Nc                 S   
   t | tS r   
isinstancer   xr   r   r   r,         
 z%_shard_dict_of_args.<locals>.<lambda>is_leafc                 S   rp   r   rq   rs   r   r   r   r,      ru   Tstrictr   z#BlockMask only supports split_dim=0r<   zUnsupported chunk spec: z and value: z combination.c                 S   s   g | ]}g qS r   r   rk   r   r   r   rn     ro   c                    s   g | ]}t | qS r   )r	   )rl   _flat_split_result	tree_specr   r   rn   .  s    )rQ   lenlistkeysr   zipr8   rr   rR   r=   Tensorr
   rO   r   r   rI   
ValueErrorminrj   r\   )	args_dictr.   r:   valueschunk_specsrm   split_sizesvr^   result_num_chunksflat_split_resultsv_splitsrz   _v_splitr   r{   r   _shard_dict_of_args   sf   





r   args.kwargschunksr.   r0   c           
      C   s   |du ri }dd }|du rt || dd d}|du r$t ||dd d}ttt| tt||}t|}t|||}t||k rTt|}ttt| tt||}t|t|krjtdt| d	t| d
d |D }	|	|fS )a  
    Given a sequence of args and kwargs, split them into a number of chunks
    according to  their respective chunking specs.

    Args:
        args: Tuple of args
        kwargs: Dict of kwargs
        chunks: Number of chunks to split the args and kwargs into
        args_chunk_spec: chunking specs for args, in same shape as args
        kwargs_chunk_spec: chunking specs for kwargs, in same shape as kwargs

    Returns:
        args_split: List of sharded args
        kwargs_split: List of sharded kwargs
    Nc                 S   s   t | tjtB rttS t S r   )rr   r=   r   r   r
   DEFAULT_CHUNK_DIMr8   r   r   r   r   default_specq  s   z3split_args_kwargs_into_chunks.<locals>.default_specc                 S   rp   r   rq   r   r   r   r   r,   y  ru   z/split_args_kwargs_into_chunks.<locals>.<lambda>rv   c                 S   rp   r   rq   r   r   r   r   r,   ~  ru   z;args and kwargs are split into different number of chunks: z, c                    s*   g | ] t  fd dtt D qS )c                 3   s    | ]} | V  qd S r   r   )rl   i
chunk_argsr   r   	<genexpr>  s    z;split_args_kwargs_into_chunks.<locals>.<listcomp>.<genexpr>)r5   rQ   r}   )rl   r   r   r   rn     s    z1split_args_kwargs_into_chunks.<locals>.<listcomp>)r   r   r6   	enumerater}   RuntimeError)
r   r   r   r.   r0   r   args_split_dictreal_num_chunkskwargs_split
args_splitr   r   r   r   4  sR   8





r   c                    s6  |durt |\}}nt | d \}}ttgt| }g | D ]}t |\}}t|t|kr:td| d| | q g }t|D ]\ }	t|	trӇ fddttD }
t	r|
d j
}|
dd D ]	}|j
|kssJ qjtjtj|dd	it|
|	jd
}g }d}t|
t|ksJ t|
|ddD ])\}}|||	j }tdddg|j }t||||	j< || }|| |}qn|
}|tj||	jd qFt|	tr|	j}ttD ]}|	||   }q|| qFd   }tdtD ]}|   |ksJ q|| qFt||S )z
    Given a list of chunks, merge them into a single value according to
    the chunk spec.

    Args:
        chunks: list of chunks
        chunk_spec: Chunking spec for the chunks

    Returns:
        value: Merged value
    Nr   zChunk z did not match chunk spec c                    s   g | ]}|   qS r   r   )rl   r[   arg_idxchunks_flattenedr   r   rn     s    
z merge_chunks.<locals>.<listcomp>r<   devicemeta)sectionsr+   Trx   r*   )r   r
   r   r}   r   rR   r   rr   rQ   r_   shaper=   rP   emptyr   r   rO   ra   rb   catr   r   r   r	   )r   
chunk_specspec_flattenedflatten_specchunk0_flatchunkchunk_flattenedrm   args_flattenedargpartial_valuesoverall_shapevalmeta_chunksvalues_to_catchunk_start_idxpartial_value
meta_chunkchunk_end_idxri   slicedreduced_valr[   valuer   r   r   r     sh   -





r   )NN)'loggingoperatorcollections.abcr   typingr   r=   torch.fx.noder   !torch.nn.attention.flex_attentionr   torch.utils._pytreer   r   r	   __all__	getLoggerr   loggerr_   r   r   r]   addsum_reducerr   r
   r8   r2   r~   r\   r   rj   r   r5   r6   r7   r   r   r   r   r   r   <module>   sj   
9
@
)W

s