o
    iz                     @   s.  d Z ddlZddlmZ ddlZddlZddlm	Z	 ddl
mZ ddlmZmZmZ e r7ddlZddlmZ eeZdd	 Ze rMe rMdd
lmZ nddlmZ G dd deZG dd deZdadd Zdd Zdd Zdd Z dd Z!d(ddZ"dd Z#d)dd Z$d*d"d#Z%d)d$d%Z&d&d' Z'dS )+z
Integration with Deepspeed
    N)partialmethod   )dep_version_check)is_accelerate_availableis_torch_availablelogging)nnc                  C   s@   t jdd u} | rztd}W dS  tjy   Y dS w d S )N	deepspeedTF)	importlibutil	find_specimportlib_metadatametadataPackageNotFoundError)package_exists_ r   e/sda-disk/www/egybert/egybert_env/lib/python3.10/site-packages/transformers/integrations/deepspeed.pyis_deepspeed_available$   s   
r   )HfDeepSpeedConfig)objectc                       s    e Zd ZdZ fddZ  ZS )r   aJ  
    This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage.

    A `weakref` of this object is stored in the module's globals to be able to access the config from areas where
    things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore
    it's important that this object remains alive while the program is still running.

    [`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration
    with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic
    the DeepSpeed configuration is not modified in any way.

    Args:
        config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict.

    c                    s(   t |  td td t | d S )N
accelerater	   )set_hf_deepspeed_configr   super__init__selfconfig_file_or_dict	__class__r   r   r   J   s   zHfDeepSpeedConfig.__init__)__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r   9   s    r   c                       sX   e Zd ZdZ fddZdd Zdd Zdd
dZeeddZ	dddZ
dd Z  ZS )HfTrainerDeepSpeedConfigz
    The `HfTrainerDeepSpeedConfig` object is meant to be created during `TrainingArguments` object creation and has the
    same lifespan as the latter.
    c                    s   t  | d | _g | _d S N)r   r   _dtype
mismatchesr   r   r   r   r   X   s   
z!HfTrainerDeepSpeedConfig.__init__c                 C   s   | j d u r	td| j S )Nz8trainer_config_process() wasn't called yet to tell dtype)r'   
ValueErrorr   r   r   r   dtype]   s   
zHfTrainerDeepSpeedConfig.dtypec                 C   s   |  |}|d u rdS |dkS )NFauto)	get_value)r   ds_key_longvalr   r   r   is_autob   s   
z HfTrainerDeepSpeedConfig.is_autoNTc              
   C   s   |  |\}}|du rdS ||dkr|||< dS |sdS ||}|dur?||krA| jd| d| d| d|  dS dS dS )a  
        A utility method that massages the config file and can optionally verify that the values match.

        1. Replace "auto" values with `TrainingArguments` value.

        2. If it wasn't "auto" and `must_match` is true, then check that DS config matches Trainer
        config values and if mismatched add the entry to `self.mismatched` - will assert during
        `trainer_config_finalize` for one or more mismatches.

        Nr,   z- ds =z vs hf )find_config_nodegetr(   append)r   r.   hf_valhf_key
must_matchconfigds_keyds_valr   r   r   
fill_matchi   s   
(z#HfTrainerDeepSpeedConfig.fill_matchF)r7   c                 C   sH  |j |j |j }| d|jd|  | d|jd | d|d|  | d|jd | d|jd	 | d
|j|jgd | d|jd | d|j	d | 
dd | d|jd	 |jrr| jdi | jd< |j| jd d< | d|jpz|jd | d|jp|jd | drtj| _dS | drtj| _dS tj| _dS )z
        Adjust the config with `TrainingArguments` values. This stage is run during `TrainingArguments` object
        creation.
        train_micro_batch_size_per_gpuper_device_train_batch_sizegradient_accumulation_stepstrain_batch_sizeztrain_batch_size (calculated)gradient_clippingmax_grad_normzoptimizer.params.lrlearning_ratezoptimizer.params.betaszadam_beta1+adam_beta2zoptimizer.params.epsadam_epsilonzoptimizer.params.weight_decayweight_decayzscheduler.params.warmup_min_lrr   zscheduler.params.warmup_max_lr
checkpointuse_node_local_storagezfp16.enabledzfp16|fp16_full_evalzbf16.enabledzbf16|bf16_full_evalN)
world_sizer=   r>   r;   rA   rB   
adam_beta1
adam_beta2rC   rD   	fill_onlysave_on_each_noder8   r3   fp16fp16_full_evalbf16bf16_full_evalis_truetorchbfloat16r'   float16float32)r   argsauto_find_batch_sizer?   r   r   r   trainer_config_process   sN   


z/HfTrainerDeepSpeedConfig.trainer_config_processc                    sZ  g d} fdd|D }t |dkrd}t|drYt|jdr%|jj}n4t|jdr2t|jj}n't|jd	rEt|jjdrE|jjj}nt|jd	rYt|jjdrYt|jjj}|du retd
| d d||   	 r dt
d| |   dd|   d|d  d||d t  jdkrd j}td| ddS )z
        This stage is run after we have the model and know num_training_steps.

        Now we can complete the configuration process.
        )$zero_optimization.reduce_bucket_size-zero_optimization.stage3_prefetch_bucket_size4zero_optimization.stage3_param_persistence_thresholdc                    s   g | ]	}  |r|qS r   )r0   ).0xr*   r   r   
<listcomp>       zDHfTrainerDeepSpeedConfig.trainer_config_finalize.<locals>.<listcomp>r   Nr8   hidden_sizehidden_sizestext_configzThe model's config file has neither `hidden_size` nor `hidden_sizes` entry, therefore it's not possible to automatically fill out the following `auto` entries in the DeepSpeed config file: zb. You can fix that by replacing `auto` values for these keys with an integer value of your choice.rX   rY   g?rZ   
   z scheduler.params.total_num_stepsznum_training_steps (calculated)z!scheduler.params.warmup_num_stepswarmup_steps
z]Please correct the following DeepSpeed config values that mismatch TrainingArguments values:
zF
The easiest method is to set these DeepSpeed config values to 'auto'.)lenhasattrr8   r_   maxr`   ra   r)   rJ   is_zero3intr;   get_warmup_stepsr(   join)r   rU   modelnum_training_stepshidden_size_based_keyshidden_size_auto_keysr_   r(   r   r*   r   trainer_config_finalize   s^   	

z0HfTrainerDeepSpeedConfig.trainer_config_finalize)NTF)r    r!   r"   r#   r   r+   r0   r;   r   rJ   rW   rp   r$   r   r   r   r   r%   R   s    

:r%   c                 C   s   t | ad S r&   )weakrefref_hf_deepspeed_config_weak_ref)hf_deepspeed_config_objr   r   r   r   	  s   r   c                   C   s   d a d S r&   )rt   r   r   r   r   unset_hf_deepspeed_config  s   rv   c                   C   s    t d urt  d urt   S dS )NF)rt   rh   r   r   r   r   is_deepspeed_zero3_enabled  s   
rw   c                   C   s   t d urt  d urt  jS d S r&   )rt   r8   r   r   r   r   deepspeed_config  s   rx   c                    s  t  }|dur1|di dd}|di }t|tr)t||di dd}|dkr1tddd	lm mm	m
} t|d
d}| j}i }	|   D ]\}
}tj|j|jdd|	|
< qNfdd|D } fdd|D }t|dkri }| D ]\}}|||g ||	\}}||	v r|||< q}|dur||_|S dd |D }i }i }t| fddd}|D ]:}||}||||||	\}}||	v r|dur|| } |j|j|jd}|||}||||| q|||< q| D ]A\}}z%|j|| | jd}| D ]\}}t|tr|d n|}|||< qW q t y6 } zt!d| d| |d}~ww |dur?||_|S )z
    Apply weight conversions (renaming and merging/splitting operations) to a state dict.
    This is a simplified version that handles the conversion without loading into the model.
    Ntensor_parallelautotp_size   	inferencetp_sizezWeight conversions (e.g., MoE expert fusion) with DeepSpeed Tensor Parallelism are not yet implemented but support is coming soon. Please disable tensor_parallel in your DeepSpeed config or convert your checkpoint to the expected format first.r   )WeightConverterWeightRenamingdot_natural_keyrename_source_key	_metadatameta)r+   devicec                       g | ]	}t | r|qS r   
isinstancer[   entry)r   r   r   r]   H  r^   z;_apply_weight_conversions_to_state_dict.<locals>.<listcomp>c                    r   r   r   r   )r~   r   r   r]   I  r^   r   c                 S   s   i | ]}|j D ]}||qqS r   )source_patterns)r[   	converterkr   r   r   
<dictcomp>X  s    z;_apply_weight_conversions_to_state_dict.<locals>.<dictcomp>c                    s    | S r&   r   )r   )r   r   r   <lambda>_  s    z9_apply_weight_conversions_to_state_dict.<locals>.<lambda>)key)r   target_patterns
operations)rl   r8   z'Failed to apply weight conversion for 'zb'. This likely means the checkpoint format is incompatible with the current model version. Error: )"rx   r3   r   dictrg   NotImplementedErrorcore_model_loadingr~   r   r   r   getattrbase_model_prefix
state_dictitemsrQ   emptyshaper+   re   r   sortedkeyspopr   r   r   
setdefault
add_tensorconvertr8   list	ExceptionRuntimeError)rl   r   weight_mapping	ds_configr}   inference_configr   r   prefixmodel_state_dictr   param	renamings
convertersnew_state_dictoriginal_keytensorrenamed_keyr   pattern_to_converterconversion_mappingsorted_keyssource_patternr   new_convertermappingrealized_valuetarget_nameer   )r~   r   r   r   '_apply_weight_conversions_to_state_dict%  s   


r   c                    s   t |dd| }dur|_d}|durt |dd}|dur0t|dkr0t| ||}|| _g  |  t t | ddfdd|	 D }dd
t
jf fdd| |d	d  fS )a  
    Loads state dict into a model specifically for Zero3, since DeepSpeed does not support the `transformers`
    tensor parallelism API.

    Nearly identical code to PyTorch's `_load_from_state_dict`

    Args:
        model_to_load: The model to load weights into
        state_dict: The state dict containing the weights
        load_config: Optional LoadStateDictConfig containing weight_mapping and other loading options
    r   Nr   r   r   c                    s<   i | ]\}}   d | dur d | n||qS ).N)r3   )r[   r   v)meta_model_state_dictprefix_modelr   r   r     s    *z5_load_state_dict_into_zero3_model.<locals>.<dictcomp> Fmodulec                    s$  d u ri n	 |d d i }||d< |||dg g  f}t rwdd l}t| j|d d dd}g }|D ]}	|	|v rL||	 }
d|
_||
 |	 q5t|dkrw|j	j
|dd tj dkrh| j|  W d    n1 srw   Y  | j D ]\}}|d ur|||| d | q|d S )	Nassign_to_params_buffersTr   F)r   recurse)modifier_rankr   )r3   rw   r	   r   named_parameters_is_hf_initializedr4   discardre   zeroGatheredParametersrQ   distributedget_rank_load_from_state_dict_modulesr   )r   r   r   r   local_metadatarU   r	   r   params_to_gatherr   r   namechild)
error_msgsloadr   missing_keysr   r   r     s2    


z/_load_state_dict_into_zero3_model.<locals>.load)r   )r   F)r   copyr   re   r   _weight_conversionsr   setr   r   r   Module)model_to_loadr   load_configr   r   )r   r   r   r   r   r   r   !_load_state_dict_into_zero3_model  s(   "r   c                    s   ddl m}m} |j}d}d|v r||d}n| r td  }d|d< d}	d	|v r6||}	||	fS t||rH fd
d}
|||
d}	||	fS )zY
    A convenience wrapper that deals with optimizer and lr scheduler configuration.
    r   )
DummyOptimDummySchedulerN	optimizer)paramszDetected ZeRO Offload and non-DeepSpeed optimizers: This combination should work as long as the custom optimizer has both CPU and GPU implementation (except LAMB)Tzero_allow_untested_optimizer	schedulerc                    s"   t  }d |_|j | d}|S )N)rm   r   )r   lr_schedulercreate_scheduler)r   trainer_copyr   rm   trainerr   r   _lr_scheduler_callable  s   
z5deepspeed_optim_sched.<locals>._lr_scheduler_callable)lr_scheduler_callable)	accelerate.utilsr   r   r8   
is_offloadloggerinfocreate_optimizerr   )r   hf_deepspeed_configrU   rm   model_parametersr   r   r8   r   r   r   r   r   r   deepspeed_optim_sched  s&   

r   Fc                 C   s   ddl m} | j}| j}| jjjj}|||| |	|
  |r>| s*td|d |d d\}}d}	||fS d| _|jdi d	d
}
|
d
kr`ddl}|j||
| |jd}ttdd | }	t| ||||	\}}||fS )a  
    Init DeepSpeed, after updating the DeepSpeed configuration with any relevant Trainer's args.

    If `resume_from_checkpoint` was passed then an attempt to resume from a previously saved checkpoint will be made.

    Args:
        trainer: Trainer object
        num_training_steps: per single gpu
        resume_from_checkpoint: path to a checkpoint if to resume from after normal DeepSpeedEngine load
        inference: launch in inference mode (no optimizer and no lr scheduler)
        auto_find_batch_size: whether to ignore the `train_micro_batch_size_per_gpu` argument as it's being
            set automatically by the auto batch size finder

    Returns: optimizer, lr_scheduler

    We may use `deepspeed_init` more than once during the life of Trainer, when we do - it's a temp hack based on:
    https://github.com/deepspeedai/DeepSpeed/issues/1394#issuecomment-937405374 until Deepspeed fixes a bug where it
    can't resume from a checkpoint after it did some stepping https://github.com/deepspeedai/DeepSpeed/issues/1612

    r   )r   zMZeRO inference only makes sense with ZeRO Stage 3 - please adjust your configr   r   )NNNry   rz   r{   )rl   r}   r+   r8   c                 S   s   | j S r&   )requires_grad)pr   r   r   r   O  s    z deepspeed_init.<locals>.<lambda>)deepspeed.utilsr   rl   rU   acceleratorstatedeepspeed_pluginhf_ds_configrp   setLevelget_process_log_levelrh   r)   del_config_sub_treer   r8   r3   r	   tp_model_initr+   r   filter
parametersr   )r   rm   r|   	ds_loggerrl   rU   r   r   r   r   deepspeed_tp_sizer	   r   r   r   deepspeed_init  s:   


r   Tc                 C   sv   dd l }t| | d}t|dkr4td|  | j||ddd\}}|d u r2td| d S td| )Nr   z/global_step*zAttempting to resume from T)load_module_strictload_optimizer_statesload_lr_scheduler_statesz-[deepspeed] failed to resume from checkpoint z!Can't find a valid checkpoint at )globr   re   r   r   load_checkpointr)   )deepspeed_enginecheckpoint_pathr   r  deepspeed_checkpoint_dirs	load_pathr   r   r   r   deepspeed_load_checkpointZ  s   
r	  c                 C   s2   | j j}t|jj|_|jj|_|j|| dS )a  
    Sets values in the deepspeed plugin based on the TrainingArguments.

    Args:
        accelerator (`Accelerator`): The Accelerator object.
        args (`TrainingArguments`): The training arguments to propagate to DeepSpeed config.
        auto_find_batch_size (`bool`, *optional*, defaults to `False`):
            Whether batch size was auto-discovered by trying increasingly smaller sizes.
    N)r   r   r%   r   r8   rx   rW   )r   rU   rV   	ds_pluginr   r   r   propagate_args_to_deepspeedr  s   

r  c                    s   d|vrd|v r|d |d< |di |}|j }| jd  }|j}tjjjj||d|d dk	d
 }	tjjjj|	|d t
 fddt|D }
t
 }|
t|d	 }|r`||fS |S )aq  
    Computes the loss under sequence parallelism with `sp_backend="deepspeed"` and `sp_size > 1`.

    Performs weighted loss aggregation across SP ranks, accounting for varying numbers of valid tokens per rank
    (e.g., when some ranks receive only padding or prompt tokens that are masked with -100).

    Args:
        accelerator (`Accelerator`): The accelerator instance with `torch_device_mesh` support.
        model (`torch.nn.Module`): The model to compute the loss for.
        inputs (`dict[str, torch.Tensor | Any]`): The input data for the model. Must include `"shift_labels"` key.
        return_outputs (`bool`): Whether to return the model outputs along with the loss.
        pc (`accelerate.parallelism_config.ParallelismConfig`): The parallelism configuration.

    Returns:
        The loss, or a tuple of `(loss, outputs)` if `return_outputs` is `True`.
    labelsshift_labelssp)groupir   c                 3   s,    | ]} | d kr|  |  V  qdS )r   Nr   )r[   rankgood_tokens_per_ranklosses_per_rankr   r   	<genexpr>  s    z,deepspeed_sp_compute_loss.<locals>.<genexpr>r{   Nr   )losstorch_device_mesh	get_groupsp_sizerQ   r   r   
functional
all_gatherviewsumrangerg   )r   rl   inputsreturn_outputspcoutputsr  sp_groupsp_world_sizegood_tokens
total_losstotal_good_tokensr   r  r   deepspeed_sp_compute_loss  s   r'  r&   rq   )T)(r#   r   importlib.metadatar   r   importlib.utilr
   rr   	functoolsr   dependency_versions_checkr   utilsr   r   r   rQ   r   
get_loggerr    r   r   accelerate.utils.deepspeedr   DeepSpeedConfigbuiltinsr   r%   rt   r   rv   rw   rx   r   r   r   r   r	  r  r'  r   r   r   r   <module>   s>   
 5
kQ
6
C
