o
    0i                  	   @   sF  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlZddlmZ d	d
lmZmZmZmZ d	dlmZ d	dlmZmZ d	dlmZm Z m!Z! d	dl"m#Z# ee$Z%dd Z&dd Z'dFddZ(dFddZ)dd Z*dGddZ+dGddZ,dHddZ-dGd d!Z.dId#e/d$e/d%e0fd&d'Z1	dJd#e/d(e/d%e0d)e0fd*d+Z2d,ej3j4d-ej5fd.d/Z6d,ej3j4d0e7fd1d2Z8d3ej9j:d4e7fd5d6Z;d,ej3j4fd7d8Z<d,ej3j4d9ej3j4fd:d;Z=d,ej3j4d9eej3j4ge0f fd<d=Z>d>d? Z?d@e7d9e7fdAdBZ@dCee	ej3j4 e/f d9eAej3jB fdDdEZCdS )K    N)defaultdict)Iterable)nullcontext)Path)CallableUnion   )
get_logger   )FSDP_MODEL_NAMEOPTIMIZER_NAMESAFE_WEIGHTS_NAMEWEIGHTS_NAME)get_module_class_from_name)get_non_persistent_buffersis_peft_model)get_module_children_bottom_upis_compiled_modulesave)is_torch_versionc                   C   s"   dt jvr
dt jd< dt jd< dS )z[
    Enables RAM efficient loading of Hugging Face models for FSDP in the environment.
    ACCELERATE_USE_FSDPTrueFSDP_CPU_RAM_EFFICIENT_LOADINGNosenviron r   r   ]/sda-disk/www/egybert/egybert_env/lib/python3.10/site-packages/accelerate/utils/fsdp_utils.py!enable_fsdp_ram_efficient_loading'   s   

r   c                   C   s   dt jd< dS )z\
    Disables RAM efficient loading of Hugging Face models for FSDP in the environment.
    Falser   Nr   r   r   r   r   "disable_fsdp_ram_efficient_loading1   s   r    Fc                 C   sN   |rt | rddlm} || | jdS |d ur#ddlm} || |dS |  S )Nr   )get_peft_model_state_dictadapter_name)get_model_state_dictoptions)r   peftr!   active_adapter'torch.distributed.checkpoint.state_dictr$   
state_dict)modeladapter_only
sd_optionsr!   r$   r   r   r   _get_model_state_dict8   s   r.   c                 C   sT   |rt | rddlm} || || jdS |d ur%ddlm} || ||dS | |S )Nr   )set_peft_model_state_dictr"   )set_model_state_dictr%   )r   r'   r/   r(   r)   r0   load_state_dict)r+   r*   r,   r-   r/   r0   r   r   r   _set_model_state_dictG   s   
r2   c                 C   sT   d }| j dkr(ddlm} ddlm} || j|jkt| jddt| jddd}|S )	Nr   r   )StateDictOptionsStateDictTypeoffload_to_cpuF
rank0_only)full_state_dictcpu_offloadbroadcast_from_rank0)	fsdp_versionr)   r3   2torch.distributed.fsdp.fully_sharded_data_parallelr5   state_dict_typeFULL_STATE_DICTgetattrstate_dict_config)fsdp_pluginr-   r3   r5   r   r   r   _prepare_sd_optionsV   s   

rB   c                 C   sd  dd l m  m} ddlm} ddlm} ddlm}	 tj	|dd | j
|	jkr5|jdk}
|
| j_|
| j_| jdkrE|
|| j
| j| jnt }t| }| t|||d}| j
|	jkr|dkret d	nt d
| d	}tj||}|jdkrtd|  t|| td|  n| j
|	jkr|dkrt d|j d	nt d
| d|j d	}tj||}td|  t|| td|  nR| j
|	jkrtj|t d
| }tj	|dd td|  d|i}|j|||| d td|  W d    d S W d    d S W d    d S W d    d S 1 s+w   Y  d S )Nr   DefaultSavePlannerFullyShardedDataParallelr4   Texist_okr
   r,   r-   .bin_zSaving model to zModel saved to _rankr+   r*   storage_writerplanner) torch.distributed.checkpointdistributed
checkpoint,torch.distributed.checkpoint.default_plannerrD   r<   rF   r5   r   makedirsr=   r>   num_processesr@   r6   r7   r;   optim_state_dict_configr   rB   r.   r   pathjoinprocess_indexloggerinfotorchr   LOCAL_STATE_DICTSHARDED_STATE_DICTFileSystemWriter)rA   acceleratorr+   
output_dirmodel_indexr,   dist_cprD   FSDPr5   is_multi_processctxr-   r*   weights_nameoutput_model_fileckpt_dirr   r   r   save_fsdp_modelg   sj   

"
 $rj   c                 C   s  dd l m  m} ddlm} ddlm} ddlm}	 |  | j	|	j
kr2|jdk}
|
| j_|
| j_| jdkrB|	|| j	| j| jnt }t| }| | j	|	j
krt||uru|jdkru|jsu| jsl| jdkrltd	 W d    d S |dkr~t dnt d| d}tj||}td	|  |j p|j}|rtj |d
d}ni }td|  n| j	|	j!kr|dkrt d|j dnt d| d|j d}tj||}td	|  tj |d
d}td|  nD| j	|	j"kr3t |vrtj|t d| n|}td	|  dt#|||di}|j ||$|| d |d }td|  t%||||d}W d    |S 1 sGw   Y  |S )Nr   )DefaultLoadPlannerrE   r4   r
   zzSet the `sync_module_states` flag to `True` so that model states are synced across processes when initializing FSDP objectrJ   rK   zLoading model from Tweights_onlyzModel loaded from rL   r+   rI   )r*   storage_readerrO   )&rP   rQ   rR   rS   rk   r<   rF   r5   wait_for_everyoner=   r>   rU   r@   r6   r7   r;   rV   r   rB   typerY   is_fsdp2sync_module_states
ValueErrorr   r   rW   rX   rZ   r[   is_main_processr\   loadr]   r^   r.   FileSystemReaderr2   )rA   r`   r+   	input_dirrb   r,   rc   rk   rd   r5   re   rf   r-   rg   input_model_file
load_modelr*   ri   load_resultr   r   r   load_fsdp_model   sz   

"	
..r{   c                 C   s  dd l m  m} ddlm} ddlm} ddlm}	 tj	|dd | j
dkr2||| j| j| jnt }
t| }|
 | j
dkrOdd	lm} ||||d
}n|||}| j|	jkr|jdkr|dkrit dnt d| d}tj||}td|  t|| td|  n9tj|t d| }tj	|dd td|  |jd|i||| d td|  W d    d S W d    d S 1 sw   Y  d S )Nr   rC   rE   r4   TrG   r
   r   )get_optimizer_state_dictr%   rJ   rK   zSaving Optimizer state to zOptimizer state saved in 	optimizerrM   )rP   rQ   rR   rS   rD   r<   rF   r5   r   rT   r;   r=   r@   rV   r   rB   r)   r|   optim_state_dictr>   rY   r   rW   rX   rZ   r[   r\   r   r_   )rA   r`   r}   r+   ra   optimizer_indexrc   rD   rd   r5   rf   r-   r|   optim_stateoptim_state_nameoutput_optimizer_fileri   r   r   r   save_fsdp_optimizer   sL   


 " r   c                 C   s  dd l m  m} ddlm} ddlm}	 |  | jdkr)||| j| j	| j
nt }
t| }|
 | j|	jkrtd }|jdksD| j
jss|dkrMt dnt d| d}tj||}td|  tj|dd	}td
|  n8t |vrtj|t d| n|}td|  d| i}|j||||d |d }td|  | jdkr|j|||d}|| nddlm} |||||d W d    d S W d    d S 1 sw   Y  d S )Nr   rE   r4   r
   rJ   rK   zLoading Optimizer state from Trl   zOptimizer state loaded from zLoading Optimizer from r}   )checkpoint_idrn   zOptimizer loaded from )r+   optimr~   )set_optimizer_state_dictr%   )rP   rQ   rR   r<   rF   r5   ro   r;   r=   r@   rV   r   rB   r>   rY   r7   r   r   rW   rX   rZ   r[   r\   ru   r*   rv   optim_state_dict_to_loadr1   r)   r   )rA   r`   r}   r+   rw   r   r,   rc   rd   r5   rf   r-   r   optimizer_nameinput_optimizer_fileri   flattened_osdr   r   r   r   load_fsdp_optimizer  sV   
 

"r   Tcheckpoint_dir	save_pathsafe_serializationc                 C   s   ddl m  m} ddlm  m  m} i }t|}|jdd |j||| |	 dd |r5|t
 n|t }t| dkrI|t|d  }t|||d |S )z
    Passthrough to `torch.distributed.checkpoint.format_utils.dcp_to_torch_save`

    Will save under `save_path` as either `model.safetensors` or `pytorch_model.bin`.
    r   NTrG   )rn   rO   no_distr
   )r   )rP   rQ   rR   )torch.distributed.checkpoint.format_utilsformat_utilsr   mkdir_load_state_dictrv   _EmptyStateDictLoadPlannerr   r   lenkeyslistr   )r   r   r   rc   dist_cp_format_utilsr*   r   r   r   )_distributed_checkpoint_to_merged_weightsL  s    r   output_pathremove_checkpoint_dirc           
      C   s2  t | } ddlm} tddstd|  sh| d  }| d  }d|  d	}|rD|rD|d
7 }|d|  d|  d7 }|d7 }t||rV|d7 }|d|  d7 }t||rd|d7 }|d|  d7 }t|| }|jrtd|   t	| ||}	td|	  |rtd|   t
|  |  dS )a?  
    Merge the weights from sharded FSDP model checkpoints into a single combined checkpoint. Should be used if
    `SHARDED_STATE_DICT` was used for the model. Weights will be saved to `{output_path}/model.safetensors` if
    `safe_serialization` else `pytorch_model.bin`.

    Note: this is a CPU-bound process.

    Args:
        checkpoint_dir (`str`):
            The directory containing the FSDP checkpoints (can be either the model or optimizer).
        output_path (`str`):
            The path to save the merged checkpoint.
        safe_serialization (`bool`, *optional*, defaults to `True`):
            Whether to save the merged weights with safetensors (recommended).
        remove_checkpoint_dir (`bool`, *optional*, defaults to `False`):
            Whether to remove the checkpoint directory after merging.
    r   )PartialStatez>=z2.3.0z/`merge_fsdp_weights` requires PyTorch >= 2.3.0`pytorch_model_fsdp_0optimizer_0zTried to load from z) but couldn't find a valid metadata file.zE However, potential model and optimizer checkpoint directories exist.zPlease pass in either z/pytorch_model_fsdp_0 or z/optimizer_0zinstead.z8 However, a potential model checkpoint directory exists.zPlease try passing in z/pytorch_model_fsdp_0 instead.z< However, a potential optimizer checkpoint directory exists.z/optimizer_0 instead.zMerging FSDP weights from z.Successfully merged FSDP weights and saved to z"Removing old checkpoint directory N)r   accelerate.stater   r   rs   existsrt   rZ   r[   r   shutilrmtreero   )
r   r   r   r   r   model_path_existsoptimizer_path_existserrstater   r   r   r   merge_fsdp_weightsh  s<   

r   r+   devicec           	         s   t |dd }|s
S i  |D ]%}|d}d|d d |d }}||}t ||}d  t|< qdtjjf fdd}|S )N_tied_weights_keys.modulec                    s   t t}| jddD ]\}}t| v r|t| | q
| } | D ]\}}|D ]} | }|d u r>t| | |< q,t| || q,q&| S )NF)recurse)r   r   named_parametersidappenditemsr?   setattr)r   params_to_tienparamid_key_param_names
param_name_tied_paramsparam_init_fnr   r   param_init_fn_tied_param  s   	z7ensure_weights_retied.<locals>.param_init_fn_tied_param)r?   splitrX   get_submoduler   r\   nnModule)	r   r+   r   _tied_namesnamer   modr   r   r   r   r   ensure_weights_retied  s   


r   full_sdc                 C   sV  ddl m} ddlm}m} | }i }dd }dd }	| jrgt| |	 D ]=\\}
}}|j
}| |j}t||rB| }|j|d|jjd ||||j}|||
|\}}|	|||}|||
< q(n;| D ]6\}
}|j
}tj| |j|jd	}|j|d|jjd ||||j}|||
|\}}|	|||}|||
< qk|j|d
d |S )a  
    Loads the full state dict (could be only on rank 0) into the sharded model. This is done by broadcasting the
    parameters from rank 0 to all other ranks. This function modifies the model in-place.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`):
            The model to load the state dict into, expected to be on meta device or a VRAM spike can occur
        full_sd (`dict`): The full state dict to load, can only be on rank 0
    r   N)DTensordistribute_tensorc           
      S   s   z|  |}W n ty"   |dd\}}| |}t||}Y nw ttd}d }|o1|jtjk}	|jj	r;|	s;|j}|d uoB|
 |fS )Nr   r
   float8_e4m3fn)get_parameter_or_bufferAttributeErrorrsplitr   r?   hasattrr\   dtyper   is_floating_pointis_contiguous)
r+   r   empty_param	old_parambase_param_namelocal_param_name	submoduleis_torch_e4m3fn_availablecasting_dtypeis_param_float8_e4m3fnr   r   r   _infer_parameter_dtype  s   

z:fsdp2_load_full_state_dict.<locals>._infer_parameter_dtypec                 S   s$   |d ur
| j |d} |r|  } | S )N)r   )to
contiguous)tensorto_contiguousr   r   r   r   _cast_and_contiguous  s
   z8fsdp2_load_full_state_dict.<locals>._cast_and_contiguous)srcgroup)r   r   T)assign)torch.distributedrQ   torch.distributed.tensorr   r   r*   rt   zipr   valuesdevice_meshdetachr   device_type
isinstanceto_local	broadcastr   WORLD
placementsr\   emptysizer   r1   )r`   r+   r   distr   r   meta_sharded_sd
sharded_sdr   r   r   
full_paramsharded_paramr   sharded_tensorr   r   full_tensorr   r   r   fsdp2_load_full_state_dict  sH   


r   r}   mappingc                    s\   ddl m} i }d||< z| jD ]} fdd|d D |d< qW dS  ty-   tdw )	a  
    Switches the parameters of the optimizer to new ones (sharded parameters in usual case). This function modifies the
    optimizer in-place.

    Args:
        optimizer (`torch.optim.Optimizer`): Optimizer instance which contains the original model parameters
        mapping (`dict`): Mapping from the original parameter (specified by `data_ptr`) to the sharded parameter

    Raises:
        KeyError:
            If a parameter in the optimizer couldn't be switched to its sharded version. This should never happen and
            indicates a bug. If we kept the original params instead of raising, the training wouldn't be numerically
            correct and weights wouldn't get updated.
    r   )r   _local_tensorc                    s   g | ]} |j  qS r   )data_ptr.0pr   r   r   
<listcomp>5  s    z5fsdp2_switch_optimizer_parameters.<locals>.<listcomp>paramszA parameter in the optimizer couldn't be switched to its sharded version. This breaks the training. Please raise an issue on GitHub.N)r   r   param_groupsKeyError)r}   r   r   accessor_mappingparam_groupr   r   r   !fsdp2_switch_optimizer_parameters  s   
r  c           	      C   s   ddl m} t| jj|}t|dddd D ]3\}}t|ddkr-|dd\}}nd}|}|r8|	|n|}||rJ||d	d
}|
|| q|S )a8  
    Applies the activation checkpointing to the model.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`): The model to apply the activation checkpointing to

    Returns:
        `torch.nn.Module`: The model with the activation checkpointing applied
    r   )checkpoint_wrapperT)return_fqnsNr   r   r
   F)preserve_rng_state);torch.distributed.algorithms._checkpoint.checkpoint_wrapperr  fsdp2_prepare_auto_wrap_policyr   rA   r   r   r   r   r   register_module)	r`   r+   r  auto_wrap_policy_func
layer_namelayerparent_name
child_nameparent_moduler   r   r   fsdp2_apply_ac>  s   r  returnc                    sb  ddl m}m}m} t||pt|ot|j|}|r|S | jj}|	| |
 }t| dd}|j|j|jp9| |durE|t| jj ndt|j|| jd}	d}
| D ]\}}|jjdkrdd}
 nqV|jr|
st|ddd	 t fd
d| D }|td}t|dr|   t!||}|durt"|dd D ]}||rt||s||fi |	 qt||s||fi |	 |jrt#| || |jr	|
s	|$ D ](\}}|| j}d|v r|%dd\}}|&|}n|}|}|j'||dd qt|dr	|   t|dd}| j(dkr/|du s |tj)kr/|tj)}| j*r/t+,d |S )a"  Prepares the model for FSDP2 in-place. Also returns the model to avoid misuse of the original model.

    Args:
        accelerator (`Accelerator`): The accelerator instance
        model (`torch.nn.Module`): The model to prepare

    Returns:
        `torch.nn.Module`: Prepared model
    r   )
FSDPModuleMixedPrecisionPolicyfully_shardtorch_device_meshN)reshard_after_forwardoffload_policy	mp_policymeshignored_paramsF
Params4bitT)r   fqnsc                    s   i | ]\}}| v r||qS r   r   r   kvnon_persistent_buffer_fqnsr   r   
<dictcomp>      z'fsdp2_prepare_model.<locals>.<dictcomp>metatie_weightsr   r   r
   )
persistentr   noz~FSDP upcast of low precision parameters to fp32 (since mixed_precision != 'no') may affect the precision of model checkpoints.)-torch.distributed.fsdpr  r  r  r   r   	_orig_modr   rA   set_auto_wrap_policyr*   r?   r  r9   mixed_precision_policytupleparallelism_configfsdp_dim_namesget_parameters_from_modulesignored_modulesr   r   	__class____name__cpu_ram_efficient_loadingr   copydeepcopynamed_buffersr   r\   r   r&  r	  r   r   r   r   r   register_buffermixed_precisionfloat32rt   warningswarn)r`   r+   r  r  r  is_type_fsdpfsdp2_pluginoriginal_sdr  fsdp2_kwargsmodel_has_params4bitr   r   original_non_persistent_buffersr  r   fqnbuffer_tensor
parent_fqnlocal_buffer_namer  model_dtyper   r!  r   fsdp2_prepare_model_  sv   



	
	


"rH  c           
         s   ddl m}m}  j}t|tjr|j}||u r^t|dd}|du r$g }t	|} j
dur0 j
}t |D ]}t||}|du rHtd| d| q5dtjjdtf fd	d
}	|	S ||u rqdtjjdtf fdd
}	|	S dS )a!  Prepares the auto wrap policy based on its type, done to mimic the behaviour of FSDP1 auto wrap policy.

    Args:
        fsdp2_plugin (`FullyShardedDataParallelPlugin`):
            Instance of `FullyShardedDataParallelPlugin` containing the configuration options
        auto_wrap_policy_type (`str`):
            Either `transformer` or `size`
        model (`torch.nn.Module`):
            The model to wrap

    Returns:
        `Callable[[torch.nn.Module], bool]`:
            The auto wrap policy function to be applied to the model
    r   )size_based_auto_wrap_policytransformer_auto_wrap_policy_no_split_modulesNz+Could not find the transformer layer class z in the model.r   r  c                    s    j d u rdS t| tS )NF)transformer_cls_names_to_wrapr   r-  )r   r>  transformer_cls_to_wrapr   r   policy  s   
z.fsdp2_prepare_auto_wrap_policy.<locals>.policyc                    s    t dd |  D }| jkS )Nc                 s   s    | ]}|  V  qd S )N)numelr   r   r   r   	<genexpr>  s    zAfsdp2_prepare_auto_wrap_policy.<locals>.policy.<locals>.<genexpr>)sum
parametersmin_num_params)r   module_num_params)r>  r   r   rO    s   
)torch.distributed.fsdp.wraprI  rJ  auto_wrap_policyr   	functoolspartialfuncr?   r   rL  setr   rs   addr\   r   r   bool)
r>  r+   rI  rJ  fnno_split_modulesrL  layer_classtransformer_clsrO  r   rM  r   r	    s.   

r	  c                  K   s   ddl m} |di | S )a  
    Returns a `GradScaler` for FSDP2, as the current implementation of `get_grad_scaler` doesn't accept other args. We
    need this as current `get_grad_scaler` accepts only `distributed_type` as arg, which doesn't differentiate between
    FSDP1 and FSDP2
    r   )
GradScalerNr   )torch.amp.grad_scalerrb  )kwargsrb  r   r   r   get_fsdp2_grad_scaler  s   re  named_paramsc                 C   s:   dd |   D } dd |   D } dd |   D } | S )a6  Removes parameter name modifiers in order to map them back to their original names.

    See huggingface/accelerate#3554 for more context.

    Args:
        named_params (`dict`): The named parameters dictionary to canonicalize.

    Returns:
        `dict`: The canonicalized named parameters dictionary
    c                 S      i | ]\}}| d d|qS )z._checkpoint_wrapped_module replacer  r   r   r   r#    r$  z,fsdp2_canonicalize_names.<locals>.<dictcomp>c                 S   s,   i | ]\}}| d r|d dn||qS )z
_orig_mod.rh  )
startswithrj  r  r   r   r   r#    s     c                 S   rg  )z
._orig_modrh  ri  r  r   r   r   r#     r$  )r   )rf  r   r   r   fsdp2_canonicalize_names  s   rl  modulesc                 C   s   | du rt  S g }t| tr/t| }g }| D ]\}}||r,|| || q|} | D ]}|	t
|  q1t |S )zConverts modules to parameters where modules can be a string or list of torch.nn.Module

    Args:
        modules (`Union[Iterable[torch.nn.Module], str]`): List of modules

    Returns:
        `set[torch.nn.Parameter]`: List of parameters
    N)r[  r   strrecompilenamed_modules	fullmatchr   r   extendr   rS  )rm  r+   r   rS  regmapped_modulesr   r   r   r   r   r0  $  s   




r0  )FN)r   F)r   )T)TF)Dr5  rX  r   ro  r   r;  collectionsr   collections.abcr   
contextlibr   pathlibr   typingr   r   r\   loggingr	   	constantsr   r   r   r   dataclassesr   modelingr   r   otherr   r   r   versionsr   r3  rZ   r   r    r.   r2   rB   rj   r{   r   r   rn  r]  r   r   r   r   r   r   dictr   r   	Optimizerr  r  rH  r	  re  rl  r[  	Parameterr0  r   r   r   r   <module>   sh   




:
H
03
7.R!$r5
