| from transformers.image_processing_utils import BaseImageProcessor, BatchFeature |
| from typing import Optional, Union, Tuple, Dict, List, Iterable |
| from transformers.image_transforms import to_channel_dimension_format, PaddingMode |
| from transformers.image_utils import ChannelDimension, to_numpy_array, make_list_of_images, get_image_size, infer_channel_dimension_format |
| from transformers.utils import TensorType |
| from PIL import Image |
| import numpy as np |
| try: |
| from torchvision.transforms import InterpolationMode |
| BICUBIC = InterpolationMode.BICUBIC |
| except ImportError: |
| BICUBIC = Image.BICUBIC |
|
|
| import torch |
| from transformers.utils import ( |
| TensorType, |
| is_torch_device, |
| is_torch_dtype, |
| requires_backends, |
| ) |
|
|
| from torchvision.transforms import Compose, ToTensor, Normalize, ToPILImage, RandomResizedCrop, Resize |
|
|
| try: |
| from torchvision.transforms import InterpolationMode |
| BICUBIC = InterpolationMode.BICUBIC |
| except ImportError: |
| BICUBIC = Image.BICUBIC |
|
|
| from PIL import Image |
| import torch |
| import numpy as np |
| import os |
| processor_for_vllm = int(os.getenv("PROCESSOR_FOR_VLLM", 0)) |
|
|
| def select_best_resolution(original_size, possible_resolutions): |
| """ |
| Selects the best resolution from a list of possible resolutions based on the original size. |
| |
| Args: |
| original_size (tuple): The original size of the image in the format (width, height). |
| possible_resolutions (list): A list of possible resolutions in the format [(width1, height1), (width2, height2), ...]. |
| |
| Returns: |
| tuple: The best fit resolution in the format (width, height). |
| """ |
| original_width, original_height = original_size |
| best_fit = None |
| max_effective_resolution = 0 |
| min_wasted_resolution = float("inf") |
|
|
| for width, height in possible_resolutions: |
| |
| scale = min(width / original_width, height / original_height) |
| downscaled_width, downscaled_height = int(original_width * scale), int(original_height * scale) |
|
|
| |
| effective_resolution = min(downscaled_width * downscaled_height, original_width * original_height) |
| wasted_resolution = (width * height) - effective_resolution |
|
|
| if effective_resolution > max_effective_resolution or (effective_resolution == max_effective_resolution and wasted_resolution < min_wasted_resolution): |
| max_effective_resolution = effective_resolution |
| min_wasted_resolution = wasted_resolution |
| best_fit = (width, height) |
|
|
| return best_fit |
|
|
| def divide_to_patches(image, patch_size): |
| """ |
| Divides an image into patches of a specified size. |
| |
| Args: |
| image (PIL.Image.Image): The input image. |
| patch_size (int): The size of each patch. |
| |
| Returns: |
| list: A list of PIL.Image.Image objects representing the patches. |
| """ |
| patches = [] |
| width, height = image.size |
| for i in range(0, height, patch_size): |
| for j in range(0, width, patch_size): |
| box = (j, i, j + patch_size, i + patch_size) |
| patch = image.crop(box) |
| patches.append(patch) |
|
|
| return patches |
|
|
| def image_size_to_num_patches(image_size, grid_pinpoints, patch_size): |
| if not isinstance(grid_pinpoints, list): |
| raise TypeError("grid_pinpoints should be a list of tuples or lists") |
|
|
| |
| if not isinstance(image_size, (list, tuple)): |
| if not isinstance(image_size, (torch.Tensor, np.ndarray)): |
| raise TypeError(f"image_size invalid type {type(image_size)} with value {image_size}") |
| image_size = image_size.tolist() |
|
|
| best_resolution = select_best_resolution(image_size, grid_pinpoints) |
| width, height = best_resolution |
| num_patches = 0 |
| |
| for i in range(0, height, patch_size): |
| for j in range(0, width, patch_size): |
| num_patches += 1 |
| |
| num_patches += 1 |
| return num_patches |
|
|
| def get_anyres_image_grid_shape(image_size, grid_pinpoints, patch_size): |
| """ |
| Calculate the shape of the image patch grid after the preprocessing for images of any resolution. |
| |
| Args: |
| image_size (`tuple`): |
| The size of the input image in the format (width, height). |
| grid_pinpoints (`List`): |
| A list containing possible resolutions. Each item in the list should be a tuple or list |
| of the form `(height, width)`. |
| patch_size (`int`): |
| The size of each image patch. |
| |
| Returns: |
| tuple: The shape of the image patch grid in the format (width, height). |
| """ |
| if not isinstance(grid_pinpoints, list): |
| raise TypeError("grid_pinpoints should be a list of tuples or lists") |
|
|
| |
| if not isinstance(image_size, (list, tuple)): |
| if not isinstance(image_size, (torch.Tensor, np.ndarray)): |
| raise TypeError( |
| f"image_size invalid type: {type(image_size)} not valid, should be either list, tuple, np.ndarray or tensor" |
| ) |
| image_size = image_size.tolist() |
|
|
| width, height = select_best_resolution(image_size, grid_pinpoints) |
| return width // patch_size, height // patch_size |
|
|
|
|
| |
| class KeeyRatioResize(object): |
| def __init__(self, size): |
| self.size = size |
|
|
| def __call__(self, image): |
| return keepratio_resize(image, self.size) |
|
|
| def keepratio_resize(image, size, return_scale=False): |
| |
| w, h = image.size |
| resized_w, resized_h = size |
| if w / h > resized_w / resized_h: |
| |
| new_h = int(resized_w*h/w) |
| resized_image = image.resize((resized_w, new_h), Image.BICUBIC) |
|
|
| image = Image.new('RGB', (resized_w, resized_h), (0, 0, 0)) |
| pad_h = (resized_h - new_h) // 2 |
| image.paste(resized_image, (0, pad_h)) |
| scale = resized_w / w |
| |
| else: |
| |
| new_w = int(resized_h*w/h) |
| resized_image = image.resize((new_w, resized_h), Image.BICUBIC) |
| image = Image.new('RGB', (resized_w, resized_h), (0, 0, 0)) |
| |
| pad_w = (resized_w - new_w) // 2 |
| image.paste(resized_image, (pad_w, 0)) |
| scale = resized_h / h |
| if return_scale: |
| return image, scale |
| return image |
|
|
| def _convert_image_to_rgb(image): |
| return image.convert("RGB") |
|
|
| def _transform(img_h, img_w, image_mean=(0.48145466, 0.4578275, 0.40821073), image_std=(0.26862954, 0.26130258, 0.27577711)): |
| return Compose([ |
| |
| |
| |
| _convert_image_to_rgb, |
| ToTensor(), |
| Normalize(image_mean, image_std), |
| ]) |
|
|
|
|
| def get_hw_multiple_of(image_size, multiple, max_size=None): |
| w, h = image_size |
| new_w = w if w % multiple == 0 else w + (multiple - w % multiple) |
| new_h = h if h % multiple == 0 else h + (multiple - h % multiple) |
| if max_size is not None: |
| assert isinstance(max_size, (list, tuple)) and len(max_size) == 2 |
| max_w, max_h = max_size |
| assert max_w % multiple == 0 and max_h % multiple == 0 |
| if new_w > max_w or new_h > max_h: |
| |
| |
| |
| new_w = min((new_w * max_w) // new_w, (new_w * max_h) // new_h) |
| new_h = min((new_h * max_w) // new_w, (new_h * max_h) // new_h) |
|
|
| new_w = new_w if new_w % multiple == 0 else new_w + (multiple - new_w % multiple) |
| new_h = new_h if new_h % multiple == 0 else new_h + (multiple - new_h % multiple) |
| assert new_w % multiple == 0 and new_h % multiple == 0 |
| assert new_w <= max_w and new_h <= max_h |
| return new_w, new_h |
|
|
| def resize_multiple_of(image, multiple, max_size=None): |
| """ |
| Resize the image to the multiple of a number. |
| |
| Args: |
| image (PIL.Image.Image): The input image. |
| multiple (int): The number to which the image should be resized. |
| |
| Returns: |
| PIL.Image.Image: The resized image. |
| """ |
| width, height = image.size |
| new_width, new_height = get_hw_multiple_of((width, height), multiple, max_size) |
| return image.resize((new_width, new_height), Image.BICUBIC) |
|
|
|
|
|
|
| class CustomBatchFeature(BatchFeature): |
| def convert_to_tensors(self, tensor_type: Optional[Union[str, TensorType]] = None): |
| """ |
| Convert the inner content to tensors. |
| |
| Args: |
| tensor_type (`str` or [`~utils.TensorType`], *optional*): |
| The type of tensors to use. If `str`, should be one of the values of the enum [`~utils.TensorType`]. If |
| `None`, no modification is done. |
| """ |
| if tensor_type is None: |
| return self |
|
|
| is_tensor, as_tensor = self._get_is_as_tensor_fns(tensor_type) |
|
|
| |
| for key, value in self.items(): |
| if key == "pixel_values": |
| for i, image in enumerate(value): |
| if not is_tensor(image): |
| tensor = as_tensor(image) |
| self[key][i] = tensor |
| continue |
| try: |
| if not is_tensor(value): |
| tensor = as_tensor(value) |
|
|
| self[key] = tensor |
| except: |
| if key == "overflowing_values": |
| raise ValueError("Unable to create tensor returning overflowing values of different lengths. ") |
| raise ValueError( |
| "Unable to create tensor, you should probably activate padding " |
| "with 'padding=True' to have batched tensors with the same length." |
| ) |
|
|
| return self |
|
|
| def to(self, *args, **kwargs) -> "BatchFeature": |
| """ |
| Send all values to device by calling `v.to(*args, **kwargs)` (PyTorch only). This should support casting in |
| different `dtypes` and sending the `BatchFeature` to a different `device`. |
| |
| Args: |
| args (`Tuple`): |
| Will be passed to the `to(...)` function of the tensors. |
| kwargs (`Dict`, *optional*): |
| Will be passed to the `to(...)` function of the tensors. |
| |
| Returns: |
| [`BatchFeature`]: The same instance after modification. |
| """ |
| requires_backends(self, ["torch"]) |
| import torch |
|
|
| new_data = {} |
| device = kwargs.get("device") |
| |
| if device is None and len(args) > 0: |
| |
| arg = args[0] |
| if is_torch_dtype(arg): |
| |
| pass |
| elif isinstance(arg, str) or is_torch_device(arg) or isinstance(arg, int): |
| device = arg |
| else: |
| |
| raise ValueError(f"Attempting to cast a BatchFeature to type {str(arg)}. This is not supported.") |
| |
| for k, v in self.items(): |
| if k == "pixel_values": |
| new_data[k] = [v[i].to(*args, **kwargs) for i in range(len(v))] |
| continue |
| |
| if torch.is_floating_point(v): |
| |
| new_data[k] = v.to(*args, **kwargs) |
| elif device is not None: |
| new_data[k] = v.to(device=device) |
| else: |
| new_data[k] = v |
| self.data = new_data |
| return self |
|
|
|
|
| def as_tensor(value): |
| if isinstance(value, (list, tuple)) and len(value) > 0: |
| if isinstance(value[0], np.ndarray): |
| value = np.array(value) |
| elif ( |
| isinstance(value[0], (list, tuple)) |
| and len(value[0]) > 0 |
| and isinstance(value[0][0], np.ndarray) |
| ): |
| value = np.array(value) |
| if isinstance(value, np.ndarray): |
| return torch.from_numpy(value) |
| else: |
| return torch.tensor(value) |
|
|
| class ImageProcessor(BaseImageProcessor): |
| model_input_names = ["pixel_values"] |
|
|
| def __init__( |
| self, |
| size: Optional[Union[int, Tuple[int, int], Dict[str, int]]] = None, |
| image_mean: Optional[Union[float, List[float]]] = None, |
| image_std: Optional[Union[float, List[float]]] = None, |
| process_image_mode: Optional[str] = 'resize', |
| patch_size: Optional[int] = 14, |
| image_grid_pinpoints: List = None, |
| **kwargs, |
| ) -> None: |
| super().__init__(**kwargs) |
| self.size = size |
| self.image_mean = image_mean |
| self.image_std = image_std |
| self.process_image_mode = process_image_mode |
| image_grid_pinpoints = ( |
| image_grid_pinpoints |
| if image_grid_pinpoints is not None |
| else [[336, 672], [672, 336], [672, 672], [1008, 336], [336, 1008]] |
| ) |
| self.image_grid_pinpoints = image_grid_pinpoints |
| self.patch_size = patch_size |
|
|
| def preprocess(self, |
| images, |
| return_tensors: Optional[Union[str, TensorType]] = None, |
| data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, |
| input_data_format: Optional[Union[str, ChannelDimension]] = None, |
| **kwargs, |
| ): |
| if self.process_image_mode == 'resize': |
| return self.resize_preprocess(images, return_tensors, data_format, input_data_format, **kwargs) |
| elif self.process_image_mode == 'anyres': |
| if processor_for_vllm == 1: |
| return self.anyres_for_vllm_preprocess(images, return_tensors, data_format, input_data_format, **kwargs) |
| return self.anyres_preprocess(images, return_tensors, data_format, input_data_format, **kwargs) |
| elif self.process_image_mode == 'keepratio_resize': |
| return self.keepratio_resize_preprocess(images, return_tensors, data_format, input_data_format, **kwargs) |
| elif self.process_image_mode == 'dynamic_res': |
| return self.dynamic_res_preprocess(images, return_tensors, data_format, input_data_format, **kwargs) |
| else: |
| raise ValueError(f"Invalid process_image_mode: {self.process_image_mode}") |
| |
| def resize_preprocess(self, images, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, **kwargs): |
| images = make_list_of_images(images) |
| all_images = [] |
| for image in images: |
| resized_image = image.resize(self.size, Image.BICUBIC) |
| transform_img = _transform(self.size[1], self.size[0], self.image_mean, self.image_std)(resized_image) |
| all_images.append(to_numpy_array(transform_img)) |
|
|
| images = [ |
| to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) |
| for image in all_images |
| ] |
|
|
| data = {"pixel_values": images} |
| return CustomBatchFeature(data=data, tensor_type=return_tensors) |
|
|
| def keepratio_resize_preprocess(self, images, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, **kwargs): |
| images = make_list_of_images(images) |
| all_images = [] |
| for image in images: |
| resized_image = keepratio_resize(image, self.size) |
| transform_img = _transform(self.size[1], self.size[0], self.image_mean, self.image_std)(resized_image) |
| all_images.append(to_numpy_array(transform_img)) |
|
|
| images = [ |
| to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) |
| for image in all_images |
| ] |
|
|
| data = {"pixel_values": images} |
| return CustomBatchFeature(data=data, tensor_type=return_tensors) |
|
|
| def dynamic_res_preprocess(self, images, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, **kwargs): |
| images = make_list_of_images(images) |
| all_images = [] |
| image_sizes = [] |
| for image in images: |
| ori_w, ori_h = image.size |
| image_sizes.append([ori_h, ori_w]) |
| resized_image = resize_multiple_of(image, self.patch_size, max_size=self.size) |
| resized_w, resized_h = resized_image.size |
| transform_img = _transform(resized_h, resized_w, self.image_mean, self.image_std)(resized_image) |
| all_images.append(to_numpy_array(transform_img)) |
|
|
| images = [ |
| as_tensor(to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format)) |
| for image in all_images |
| ] |
|
|
| |
| |
| data = {"pixel_values": images, "image_sizes": image_sizes} |
| |
| |
| return CustomBatchFeature(data=data, tensor_type=return_tensors) |
|
|
| def get_image_patches( |
| self, |
| data: Image, |
| image_grid_pinpoints, |
| ): |
| if not isinstance(image_grid_pinpoints, list): |
| raise TypeError("grid_pinpoints must be a list of possible resolutions.") |
|
|
|
|
| best_resolution = select_best_resolution(data.size, image_grid_pinpoints) |
|
|
| resized_data, scale = keepratio_resize(data, best_resolution, return_scale=True) |
| resized_data = divide_to_patches(resized_data, self.size[0]) |
| ori_data = data.resize(self.size, Image.BICUBIC) |
| data = [ori_data] + resized_data |
| return data |
| |
| def pad( |
| self, |
| image: np.ndarray, |
| padding: Union[int, Tuple[int, int], Iterable[Tuple[int, int]]], |
| mode: PaddingMode = PaddingMode.CONSTANT, |
| constant_values: Union[float, Iterable[float]] = 0.0, |
| data_format: Optional[Union[str, ChannelDimension]] = None, |
| input_data_format: Optional[Union[str, ChannelDimension]] = None, |
| ) -> np.ndarray: |
| """ |
| Pads the `image` with the specified `padding` and `mode`. Padding can be in the (`height`, `width`) |
| dimension of in the (`num_patches`) dimension. In the second case an iterable if tuples is expected |
| as input. |
| |
| Args: |
| image (`np.ndarray`): |
| The image to pad. |
| padding (`int` or `Tuple[int, int]` or `Iterable[Tuple[int, int]]`): |
| Padding to apply to the edges of the height, width axes. Can be one of three formats: |
| - `((before_height, after_height), (before_width, after_width))` unique pad widths for each axis. |
| - `((before, after),)` yields same before and after pad for height and width. |
| - `(pad,)` or int is a shortcut for before = after = pad width for all axes. |
| mode (`PaddingMode`): |
| The padding mode to use. Can be one of: |
| - `"constant"`: pads with a constant value. |
| - `"reflect"`: pads with the reflection of the vector mirrored on the first and last values of the |
| vector along each axis. |
| - `"replicate"`: pads with the replication of the last value on the edge of the array along each axis. |
| - `"symmetric"`: pads with the reflection of the vector mirrored along the edge of the array. |
| constant_values (`float` or `Iterable[float]`, *optional*): |
| The value to use for the padding if `mode` is `"constant"`. |
| data_format (`str` or `ChannelDimension`, *optional*): |
| The channel dimension format for the output image. Can be one of: |
| - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
| - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
| If unset, will use same as the input image. |
| input_data_format (`str` or `ChannelDimension`, *optional*): |
| The channel dimension format for the input image. Can be one of: |
| - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
| - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
| If unset, will use the inferred format of the input image. |
| |
| Returns: |
| `np.ndarray`: The padded image. |
| |
| """ |
|
|
| |
| if isinstance(padding, int) or len(padding) != 4: |
| return pad(image, padding, mode, constant_values, data_format, input_data_format) |
|
|
| if input_data_format is None: |
| input_data_format = infer_channel_dimension_format(image) |
| if mode == PaddingMode.CONSTANT: |
| image = np.pad(image, padding, mode="constant", constant_values=constant_values) |
| elif mode == PaddingMode.REFLECT: |
| image = np.pad(image, padding, mode="reflect") |
| elif mode == PaddingMode.REPLICATE: |
| image = np.pad(image, padding, mode="edge") |
| elif mode == PaddingMode.SYMMETRIC: |
| image = np.pad(image, padding, mode="symmetric") |
| else: |
| raise ValueError(f"Invalid padding mode: {mode}") |
| image = ( |
| to_channel_dimension_format(image, data_format, input_data_format) if data_format is not None else image |
| ) |
| return image |
|
|
| def _pad_for_batching( |
| self, |
| pixel_values: List[np.ndarray], |
| data_format: Optional[Union[str, ChannelDimension]] = None, |
| input_data_format: Optional[Union[str, ChannelDimension]] = None, |
| ): |
| """ |
| Pads images on the `num_of_patches` dimension with zeros to form a batch of same number of patches. |
| |
| Args: |
| pixel_values (`List[np.ndarray]`): |
| An array of pixel values of each images of shape (`batch_size`, `num_patches`, `image_in_3D`) |
| data_format (`str` or `ChannelDimension`, *optional*): |
| The channel dimension format for the output image. Can be one of: |
| - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
| - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
| If unset, will use same as the input image. |
| input_data_format (`str` or `ChannelDimension`, *optional*): |
| The channel dimension format for the input image. Can be one of: |
| - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
| - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
| If unset, will use the inferred format of the input image. |
| |
| Returns: |
| List[`np.ndarray`]: The padded images. |
| """ |
| max_patch = max(len(x) for x in pixel_values) |
| pixel_values = [ |
| self.pad( |
| image, |
| padding=((0, max_patch - image.shape[0]), (0, 0), (0, 0), (0, 0)), |
| data_format=data_format, |
| input_data_format=input_data_format, |
| ) |
| for image in pixel_values |
| ] |
|
|
| return pixel_values |
|
|
| def anyres_for_vllm_preprocess(self, images, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, do_pad: Optional[bool] = None, **kwargs): |
| |
| images = make_list_of_images(images) |
| new_images = [] |
| image_sizes = [] |
|
|
| for image in images: |
| ori_w, ori_h = image.size |
| image_sizes.append([ori_h, ori_w]) |
| image_patches = self.get_image_patches( |
| image, |
| self.image_grid_pinpoints |
| ) |
| all_images = [] |
| for image in image_patches: |
| transform_img = _transform(self.size[0], self.size[1], self.image_mean, self.image_std)(image) |
| img_array = to_numpy_array(transform_img) |
| img_array = to_channel_dimension_format(img_array, data_format, input_channel_dim=input_data_format) |
| all_images.append(img_array) |
| |
| pixel_values = np.array(all_images) |
| new_images.append(pixel_values) |
| |
|
|
| new_images = self._pad_for_batching(new_images) |
|
|
| data = {"pixel_values": new_images, "image_sizes": image_sizes} |
| return BatchFeature(data=data, tensor_type=return_tensors) |
|
|
| |
| def anyres_preprocess(self, images, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, do_pad: Optional[bool] = None, **kwargs): |
| |
| images = make_list_of_images(images) |
| new_images = [] |
| image_sizes = [] |
|
|
| for image in images: |
| ori_w, ori_h = image.size |
| image_sizes.append([ori_h, ori_w]) |
| image_patches = self.get_image_patches( |
| image, |
| self.image_grid_pinpoints |
| ) |
| |
| for image in image_patches: |
| transform_img = _transform(self.size[0], self.size[1], self.image_mean, self.image_std)(image) |
| img_array = to_numpy_array(transform_img) |
| img_array = to_channel_dimension_format(img_array, data_format, input_channel_dim=input_data_format) |
| |
| new_images.append(img_array) |
| |
| |
| |
| |
| |
|
|
| data = {"pixel_values": new_images, "image_sizes": image_sizes} |
| return CustomBatchFeature(data=data, tensor_type=return_tensors) |
|
|
|
|
|
|
| |