class MinTokensLogitsProcessor(LogitsProcessor):
def __init__(
self, vllm_config: "VllmConfig", device: torch.device, is_pin_memory: bool
):
# index -> (min_toks, output_token_ids, stop_token_ids)
self.device = device
self.pin_memory = is_pin_memory
self.min_toks: dict[int, tuple[int, Sequence[int], set[int]]] = {}
# (req_idx_tensor,eos_tok_id_tensor)
self.logits_slice: tuple[torch.Tensor, torch.Tensor] = (
self._device_tensor([], torch.int32),
self._device_tensor([], torch.int32),
)
self.neg_inf_tensor = torch.tensor(
-float("inf"), dtype=torch.float32, device=self.device
)
def is_argmax_invariant(self) -> bool:
"""By censoring stop tokens, min-tokens can change the outcome
of the argmax operation in greedy sampling."""
return False
@staticmethod
def add_request(
params: SamplingParams, _: list[int] | None, output_tok_ids: list[int]
) -> tuple[int, Sequence[int], set[int]] | None:
min_tokens = params.min_tokens
if not min_tokens or len(output_tok_ids) >= min_tokens:
return None
return min_tokens, output_tok_ids, params.all_stop_token_ids
def update_state(self, batch_update: BatchUpdate | None):
needs_update = process_dict_updates(
self.min_toks, batch_update, self.add_request
)
if self.min_toks:
# Check for any requests that have attained their min tokens.
to_remove = tuple(
index
for index, (min_toks, out_tok_ids, _) in self.min_toks.items()
if len(out_tok_ids) >= min_toks
)
if to_remove:
needs_update = True
for index in to_remove:
del self.min_toks[index]
# Update tensors if needed.
if needs_update:
reqs: list[int] = []
tok_ids: list[int] = []
for req, (_, _, stop_tok_ids) in self.min_toks.items():
reqs.extend([req] * len(stop_tok_ids))
tok_ids.extend(stop_tok_ids)
self.logits_slice = (
self._device_tensor(reqs, torch.int32),
self._device_tensor(tok_ids, torch.int32),
)
def _device_tensor(self, data: list, dtype: torch.dtype) -> torch.Tensor:
return torch.tensor(
data, device="cpu", dtype=dtype, pin_memory=self.pin_memory
).to(device=self.device, non_blocking=True)
def apply(self, logits: torch.Tensor) -> torch.Tensor:
if self.min_toks:
# Inhibit EOS token for requests which have not reached min length
logits.index_put_(self.logits_slice, self.neg_inf_tensor)
return logits
def apply_with_spec_decode(
self,
logits: torch.Tensor,
num_draft_tokens: list[int],
) -> torch.Tensor:
"""Spec-decode version of apply().
Priority: ``min_tokens`` > ``stop_token_ids`` / EOS.
Example: ``num_draft_tokens = [2, 3, 1]``
→ ``logits`` shape ``[6, V]``, ``cumsum = [0, 2, 5, 6]``
→ request 0 owns rows 0‑1, request 1 rows 2‑4, request 2 row 5.
"""
if not self.min_toks:
return logits
num_draft_arr = np.array(num_draft_tokens, dtype=np.int64)
cumsum = np.concatenate([[0], np.cumsum(num_draft_arr)])
entries = [
(req_idx, min_tok, len(out_tok_ids), list(stop_tok_ids))
for req_idx, (min_tok, out_tok_ids, stop_tok_ids) in self.min_toks.items()
if stop_tok_ids
]
if not entries:
return logits
all_rows: list[np.ndarray] = [] # row indices to mask
all_toks: list[np.ndarray] = [] # stop-token ids at those rows
for req_idx, min_tok, current_len, stop_toks in entries:
remaining = min_tok - current_len
# How many leading draft positions still need stop-token masking.
n_mask = int(min(max(remaining, 0), num_draft_arr[req_idx]))
if n_mask > 0:
offset = cumsum[req_idx]
row_indices = np.arange(offset, offset + n_mask, dtype=np.int64)
n_stop = len(stop_toks)
all_rows.append(np.repeat(row_indices, n_stop))
all_toks.append(np.tile(stop_toks, n_mask))
if all_rows:
rows_arr = np.concatenate(all_rows)
toks_arr = np.concatenate(all_toks)
# (row_indices, token_indices) for index_put_ to set -inf.
logits_slice = (
torch.from_numpy(rows_arr).to(self.device, non_blocking=True),
torch.from_numpy(toks_arr).to(self.device, non_blocking=True),
)
logits.index_put_(logits_slice, self.neg_inf_tensor)
return logits