utils.mp_distribution
1import logging 2import subprocess 3 4import psutil 5import torch 6import torch.multiprocessing as mp 7 8 9class Distributer: 10 """A utility class for managing CPU and GPU resource distribution across multiple processes.""" 11 12 def __init__(self): 13 """Initialize the multiprocessing distribution manager with CPU and GPU information.""" 14 self.cpu_cores = psutil.Process().cpu_affinity() 15 self.n_cpu_cores = len(self.cpu_cores) 16 self.n_gpus = torch.cuda.device_count() 17 18 logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}") 19 20 # Check if GPUs are in necessary default compute mode (shared): 21 gpu_modes = self.get_gpu_compute_modes() 22 for i, mode in enumerate(gpu_modes): 23 if not "default" in mode.lower(): 24 logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.") 25 26 def get_gpu_compute_modes(self): 27 """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command.""" 28 try: 29 result = subprocess.run( 30 ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"], 31 capture_output=True, 32 text=True, 33 check=True, 34 ) 35 modes = result.stdout.strip().split("\n") 36 return modes 37 except subprocess.CalledProcessError as e: 38 logging.error("Error running nvidia-smi:", e) 39 return [] 40 41 def distribute_cpu_cores(self, cpu_cores, n_processes): 42 """Distributes a list of CPU cores among a specified number of processes.""" 43 n_cores = len(cpu_cores) 44 45 chunk_size = n_cores // n_processes 46 remainder = n_cores % n_processes 47 48 cpu_cores_per_process = [] 49 start = 0 50 for i in range(n_processes): 51 # Determine the end index for this chunk 52 end = start + chunk_size + (1 if i < remainder else 0) 53 cpu_cores_per_process.append(cpu_cores[start:end]) 54 start = end 55 56 return cpu_cores_per_process 57 58 59class ZeroShotDistributer(Distributer): 60 """A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing.""" 61 62 def __init__(self, config, n_samples, dataset_info, detector): 63 """Initializes detector runner with configuration, samples count, dataset information and detector.""" 64 super().__init__() # Call the parent class's __init__ method 65 self.config = config 66 self.n_samples = n_samples 67 self.dataset_info = dataset_info 68 self.detector = detector 69 70 def distribute_and_run(self): 71 """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing.""" 72 dataset_name = self.dataset_info["name"] 73 models_dict = self.config["hf_models_zeroshot_objectdetection"] 74 n_post_worker = self.config["n_post_processing_worker_per_inference_worker"] 75 76 runs = [] 77 run_id = 0 78 79 for model_name in models_dict: 80 batch_size = models_dict[model_name]["batch_size"] 81 n_chunks = models_dict[model_name]["n_dataset_chunks"] 82 83 # Calculate the base split size and leftover samples 84 chunk_size, leftover_samples = divmod(self.n_samples, n_chunks) 85 86 chunk_index_start = 0 87 chunk_index_end = None 88 for split_id in range(n_chunks): 89 90 # Prepare torch subsets 91 if n_chunks == 1: 92 is_subset = False 93 else: 94 is_subset = True 95 chunk_size += leftover_samples if split_id == n_chunks - 1 else 0 96 chunk_index_end = chunk_index_start + chunk_size 97 98 # Add entry to runs 99 runs.append( 100 { 101 "run_id": run_id, 102 "model_name": model_name, 103 "is_subset": is_subset, 104 "chunk_index_start": chunk_index_start, 105 "chunk_index_end": chunk_index_end, 106 "batch_size": batch_size, 107 "dataset_name": dataset_name, 108 } 109 ) 110 111 # Update start index for next chunk 112 if n_chunks > 1: 113 chunk_index_start += chunk_size 114 115 run_id += 1 116 117 n_runs = len(runs) 118 119 logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.") 120 n_parallel_processes = min(self.n_gpus, n_runs) 121 122 n_post_processing_workers = int(n_post_worker * n_parallel_processes) 123 n_total_workers = n_parallel_processes + n_post_processing_workers 124 125 # Create results queues, max one per GPU 126 result_queues = [] 127 max_queue_size = 3 # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing) 128 for index in range(n_parallel_processes): 129 results_queue = mp.Queue( 130 maxsize=max_queue_size 131 ) # Ensure that no GPU memory overflow occurs 132 result_queues.append(results_queue) 133 134 # Dedicated worker that continuously measures the lengths of the result queues 135 result_queues_sizes = mp.Manager().list([0] * n_parallel_processes) 136 largest_queue_index = mp.Value("i", -1) 137 size_updater = mp.Process( 138 target=self.detector.update_queue_sizes_worker, 139 args=( 140 result_queues, 141 result_queues_sizes, 142 largest_queue_index, 143 max_queue_size, 144 ), 145 ) 146 size_updater.start() 147 148 # Create queue with all planned runs 149 task_queue = mp.Queue() 150 for run_id, run_metadata in enumerate(runs): 151 logging.info(f"Run {run_id} Metadata: {run_metadata}") 152 task_queue.put(run_metadata) 153 154 # Flags for synchronizations 155 inference_finished = mp.Value("b", False) 156 post_processing_finished = mp.Value("b", False) 157 158 # Distribute CPU cores 159 if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores): 160 logging.error( 161 f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores." 162 ) 163 164 cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers] 165 cpu_cores_inference = self.cpu_cores[n_post_processing_workers:] 166 cpu_cores_per_process = self.distribute_cpu_cores( 167 cpu_cores_inference, n_parallel_processes 168 ) 169 170 # Create post-processing worker processes 171 post_processing_processes = [] 172 for i in range(n_post_processing_workers): 173 p = mp.Process( 174 target=self.detector.process_outputs_worker, 175 args=( 176 result_queues, 177 largest_queue_index, 178 inference_finished, 179 max_queue_size, 180 ), 181 ) 182 post_processing_processes.append(p) 183 p.start() 184 logging.info(f"Started post-processing worker {index}") 185 186 # Create worker processes, max one per GPU 187 inference_processes = [] 188 gpu_worker_done_events = [ 189 mp.Event() for _ in range(n_parallel_processes) 190 ] # Signals when a GPU worker is done 191 192 for index in range(n_parallel_processes): 193 gpu_id = index 194 cpu_cores_for_run = cpu_cores_per_process[gpu_id] 195 results_queue = result_queues[index] 196 done_event = gpu_worker_done_events[index] 197 p = mp.Process( 198 target=self.detector.gpu_worker, 199 args=( 200 gpu_id, 201 cpu_cores_for_run, 202 task_queue, 203 results_queue, 204 done_event, 205 post_processing_finished, 206 ), 207 ) 208 inference_processes.append(p) 209 p.start() 210 logging.info(f"Started inference worker {index} for GPU {gpu_id}") 211 212 logging.info( 213 f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers." 214 ) 215 216 # Wait for all inference tasks to complete 217 while not all(worker_event.is_set() for worker_event in gpu_worker_done_events): 218 continue 219 logging.info("All workers have finished inference tasks.") 220 inference_finished.value = True 221 222 # Wait for results processing to finish 223 for p in post_processing_processes: 224 p.join() 225 logging.info("Results processing worker has shut down.") 226 post_processing_finished.value = True 227 228 # Wait for workers to finish 229 for p in inference_processes: 230 p.join() 231 logging.info("All inference workers have shut down.") 232 233 # Process updating queue sizes 234 size_updater.terminate() 235 236 # Close queues 237 task_queue.close() 238 results_queue.close() 239 logging.info("All multiprocessing queues are closed.")
class
Distributer:
10class Distributer: 11 """A utility class for managing CPU and GPU resource distribution across multiple processes.""" 12 13 def __init__(self): 14 """Initialize the multiprocessing distribution manager with CPU and GPU information.""" 15 self.cpu_cores = psutil.Process().cpu_affinity() 16 self.n_cpu_cores = len(self.cpu_cores) 17 self.n_gpus = torch.cuda.device_count() 18 19 logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}") 20 21 # Check if GPUs are in necessary default compute mode (shared): 22 gpu_modes = self.get_gpu_compute_modes() 23 for i, mode in enumerate(gpu_modes): 24 if not "default" in mode.lower(): 25 logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.") 26 27 def get_gpu_compute_modes(self): 28 """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command.""" 29 try: 30 result = subprocess.run( 31 ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"], 32 capture_output=True, 33 text=True, 34 check=True, 35 ) 36 modes = result.stdout.strip().split("\n") 37 return modes 38 except subprocess.CalledProcessError as e: 39 logging.error("Error running nvidia-smi:", e) 40 return [] 41 42 def distribute_cpu_cores(self, cpu_cores, n_processes): 43 """Distributes a list of CPU cores among a specified number of processes.""" 44 n_cores = len(cpu_cores) 45 46 chunk_size = n_cores // n_processes 47 remainder = n_cores % n_processes 48 49 cpu_cores_per_process = [] 50 start = 0 51 for i in range(n_processes): 52 # Determine the end index for this chunk 53 end = start + chunk_size + (1 if i < remainder else 0) 54 cpu_cores_per_process.append(cpu_cores[start:end]) 55 start = end 56 57 return cpu_cores_per_process
A utility class for managing CPU and GPU resource distribution across multiple processes.
Distributer()
13 def __init__(self): 14 """Initialize the multiprocessing distribution manager with CPU and GPU information.""" 15 self.cpu_cores = psutil.Process().cpu_affinity() 16 self.n_cpu_cores = len(self.cpu_cores) 17 self.n_gpus = torch.cuda.device_count() 18 19 logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}") 20 21 # Check if GPUs are in necessary default compute mode (shared): 22 gpu_modes = self.get_gpu_compute_modes() 23 for i, mode in enumerate(gpu_modes): 24 if not "default" in mode.lower(): 25 logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.")
Initialize the multiprocessing distribution manager with CPU and GPU information.
def
get_gpu_compute_modes(self):
27 def get_gpu_compute_modes(self): 28 """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command.""" 29 try: 30 result = subprocess.run( 31 ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"], 32 capture_output=True, 33 text=True, 34 check=True, 35 ) 36 modes = result.stdout.strip().split("\n") 37 return modes 38 except subprocess.CalledProcessError as e: 39 logging.error("Error running nvidia-smi:", e) 40 return []
Retrieves the compute modes of all available GPUs using the nvidia-smi
command.
def
distribute_cpu_cores(self, cpu_cores, n_processes):
42 def distribute_cpu_cores(self, cpu_cores, n_processes): 43 """Distributes a list of CPU cores among a specified number of processes.""" 44 n_cores = len(cpu_cores) 45 46 chunk_size = n_cores // n_processes 47 remainder = n_cores % n_processes 48 49 cpu_cores_per_process = [] 50 start = 0 51 for i in range(n_processes): 52 # Determine the end index for this chunk 53 end = start + chunk_size + (1 if i < remainder else 0) 54 cpu_cores_per_process.append(cpu_cores[start:end]) 55 start = end 56 57 return cpu_cores_per_process
Distributes a list of CPU cores among a specified number of processes.
60class ZeroShotDistributer(Distributer): 61 """A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing.""" 62 63 def __init__(self, config, n_samples, dataset_info, detector): 64 """Initializes detector runner with configuration, samples count, dataset information and detector.""" 65 super().__init__() # Call the parent class's __init__ method 66 self.config = config 67 self.n_samples = n_samples 68 self.dataset_info = dataset_info 69 self.detector = detector 70 71 def distribute_and_run(self): 72 """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing.""" 73 dataset_name = self.dataset_info["name"] 74 models_dict = self.config["hf_models_zeroshot_objectdetection"] 75 n_post_worker = self.config["n_post_processing_worker_per_inference_worker"] 76 77 runs = [] 78 run_id = 0 79 80 for model_name in models_dict: 81 batch_size = models_dict[model_name]["batch_size"] 82 n_chunks = models_dict[model_name]["n_dataset_chunks"] 83 84 # Calculate the base split size and leftover samples 85 chunk_size, leftover_samples = divmod(self.n_samples, n_chunks) 86 87 chunk_index_start = 0 88 chunk_index_end = None 89 for split_id in range(n_chunks): 90 91 # Prepare torch subsets 92 if n_chunks == 1: 93 is_subset = False 94 else: 95 is_subset = True 96 chunk_size += leftover_samples if split_id == n_chunks - 1 else 0 97 chunk_index_end = chunk_index_start + chunk_size 98 99 # Add entry to runs 100 runs.append( 101 { 102 "run_id": run_id, 103 "model_name": model_name, 104 "is_subset": is_subset, 105 "chunk_index_start": chunk_index_start, 106 "chunk_index_end": chunk_index_end, 107 "batch_size": batch_size, 108 "dataset_name": dataset_name, 109 } 110 ) 111 112 # Update start index for next chunk 113 if n_chunks > 1: 114 chunk_index_start += chunk_size 115 116 run_id += 1 117 118 n_runs = len(runs) 119 120 logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.") 121 n_parallel_processes = min(self.n_gpus, n_runs) 122 123 n_post_processing_workers = int(n_post_worker * n_parallel_processes) 124 n_total_workers = n_parallel_processes + n_post_processing_workers 125 126 # Create results queues, max one per GPU 127 result_queues = [] 128 max_queue_size = 3 # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing) 129 for index in range(n_parallel_processes): 130 results_queue = mp.Queue( 131 maxsize=max_queue_size 132 ) # Ensure that no GPU memory overflow occurs 133 result_queues.append(results_queue) 134 135 # Dedicated worker that continuously measures the lengths of the result queues 136 result_queues_sizes = mp.Manager().list([0] * n_parallel_processes) 137 largest_queue_index = mp.Value("i", -1) 138 size_updater = mp.Process( 139 target=self.detector.update_queue_sizes_worker, 140 args=( 141 result_queues, 142 result_queues_sizes, 143 largest_queue_index, 144 max_queue_size, 145 ), 146 ) 147 size_updater.start() 148 149 # Create queue with all planned runs 150 task_queue = mp.Queue() 151 for run_id, run_metadata in enumerate(runs): 152 logging.info(f"Run {run_id} Metadata: {run_metadata}") 153 task_queue.put(run_metadata) 154 155 # Flags for synchronizations 156 inference_finished = mp.Value("b", False) 157 post_processing_finished = mp.Value("b", False) 158 159 # Distribute CPU cores 160 if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores): 161 logging.error( 162 f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores." 163 ) 164 165 cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers] 166 cpu_cores_inference = self.cpu_cores[n_post_processing_workers:] 167 cpu_cores_per_process = self.distribute_cpu_cores( 168 cpu_cores_inference, n_parallel_processes 169 ) 170 171 # Create post-processing worker processes 172 post_processing_processes = [] 173 for i in range(n_post_processing_workers): 174 p = mp.Process( 175 target=self.detector.process_outputs_worker, 176 args=( 177 result_queues, 178 largest_queue_index, 179 inference_finished, 180 max_queue_size, 181 ), 182 ) 183 post_processing_processes.append(p) 184 p.start() 185 logging.info(f"Started post-processing worker {index}") 186 187 # Create worker processes, max one per GPU 188 inference_processes = [] 189 gpu_worker_done_events = [ 190 mp.Event() for _ in range(n_parallel_processes) 191 ] # Signals when a GPU worker is done 192 193 for index in range(n_parallel_processes): 194 gpu_id = index 195 cpu_cores_for_run = cpu_cores_per_process[gpu_id] 196 results_queue = result_queues[index] 197 done_event = gpu_worker_done_events[index] 198 p = mp.Process( 199 target=self.detector.gpu_worker, 200 args=( 201 gpu_id, 202 cpu_cores_for_run, 203 task_queue, 204 results_queue, 205 done_event, 206 post_processing_finished, 207 ), 208 ) 209 inference_processes.append(p) 210 p.start() 211 logging.info(f"Started inference worker {index} for GPU {gpu_id}") 212 213 logging.info( 214 f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers." 215 ) 216 217 # Wait for all inference tasks to complete 218 while not all(worker_event.is_set() for worker_event in gpu_worker_done_events): 219 continue 220 logging.info("All workers have finished inference tasks.") 221 inference_finished.value = True 222 223 # Wait for results processing to finish 224 for p in post_processing_processes: 225 p.join() 226 logging.info("Results processing worker has shut down.") 227 post_processing_finished.value = True 228 229 # Wait for workers to finish 230 for p in inference_processes: 231 p.join() 232 logging.info("All inference workers have shut down.") 233 234 # Process updating queue sizes 235 size_updater.terminate() 236 237 # Close queues 238 task_queue.close() 239 results_queue.close() 240 logging.info("All multiprocessing queues are closed.")
A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing.
ZeroShotDistributer(config, n_samples, dataset_info, detector)
63 def __init__(self, config, n_samples, dataset_info, detector): 64 """Initializes detector runner with configuration, samples count, dataset information and detector.""" 65 super().__init__() # Call the parent class's __init__ method 66 self.config = config 67 self.n_samples = n_samples 68 self.dataset_info = dataset_info 69 self.detector = detector
Initializes detector runner with configuration, samples count, dataset information and detector.
def
distribute_and_run(self):
71 def distribute_and_run(self): 72 """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing.""" 73 dataset_name = self.dataset_info["name"] 74 models_dict = self.config["hf_models_zeroshot_objectdetection"] 75 n_post_worker = self.config["n_post_processing_worker_per_inference_worker"] 76 77 runs = [] 78 run_id = 0 79 80 for model_name in models_dict: 81 batch_size = models_dict[model_name]["batch_size"] 82 n_chunks = models_dict[model_name]["n_dataset_chunks"] 83 84 # Calculate the base split size and leftover samples 85 chunk_size, leftover_samples = divmod(self.n_samples, n_chunks) 86 87 chunk_index_start = 0 88 chunk_index_end = None 89 for split_id in range(n_chunks): 90 91 # Prepare torch subsets 92 if n_chunks == 1: 93 is_subset = False 94 else: 95 is_subset = True 96 chunk_size += leftover_samples if split_id == n_chunks - 1 else 0 97 chunk_index_end = chunk_index_start + chunk_size 98 99 # Add entry to runs 100 runs.append( 101 { 102 "run_id": run_id, 103 "model_name": model_name, 104 "is_subset": is_subset, 105 "chunk_index_start": chunk_index_start, 106 "chunk_index_end": chunk_index_end, 107 "batch_size": batch_size, 108 "dataset_name": dataset_name, 109 } 110 ) 111 112 # Update start index for next chunk 113 if n_chunks > 1: 114 chunk_index_start += chunk_size 115 116 run_id += 1 117 118 n_runs = len(runs) 119 120 logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.") 121 n_parallel_processes = min(self.n_gpus, n_runs) 122 123 n_post_processing_workers = int(n_post_worker * n_parallel_processes) 124 n_total_workers = n_parallel_processes + n_post_processing_workers 125 126 # Create results queues, max one per GPU 127 result_queues = [] 128 max_queue_size = 3 # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing) 129 for index in range(n_parallel_processes): 130 results_queue = mp.Queue( 131 maxsize=max_queue_size 132 ) # Ensure that no GPU memory overflow occurs 133 result_queues.append(results_queue) 134 135 # Dedicated worker that continuously measures the lengths of the result queues 136 result_queues_sizes = mp.Manager().list([0] * n_parallel_processes) 137 largest_queue_index = mp.Value("i", -1) 138 size_updater = mp.Process( 139 target=self.detector.update_queue_sizes_worker, 140 args=( 141 result_queues, 142 result_queues_sizes, 143 largest_queue_index, 144 max_queue_size, 145 ), 146 ) 147 size_updater.start() 148 149 # Create queue with all planned runs 150 task_queue = mp.Queue() 151 for run_id, run_metadata in enumerate(runs): 152 logging.info(f"Run {run_id} Metadata: {run_metadata}") 153 task_queue.put(run_metadata) 154 155 # Flags for synchronizations 156 inference_finished = mp.Value("b", False) 157 post_processing_finished = mp.Value("b", False) 158 159 # Distribute CPU cores 160 if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores): 161 logging.error( 162 f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores." 163 ) 164 165 cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers] 166 cpu_cores_inference = self.cpu_cores[n_post_processing_workers:] 167 cpu_cores_per_process = self.distribute_cpu_cores( 168 cpu_cores_inference, n_parallel_processes 169 ) 170 171 # Create post-processing worker processes 172 post_processing_processes = [] 173 for i in range(n_post_processing_workers): 174 p = mp.Process( 175 target=self.detector.process_outputs_worker, 176 args=( 177 result_queues, 178 largest_queue_index, 179 inference_finished, 180 max_queue_size, 181 ), 182 ) 183 post_processing_processes.append(p) 184 p.start() 185 logging.info(f"Started post-processing worker {index}") 186 187 # Create worker processes, max one per GPU 188 inference_processes = [] 189 gpu_worker_done_events = [ 190 mp.Event() for _ in range(n_parallel_processes) 191 ] # Signals when a GPU worker is done 192 193 for index in range(n_parallel_processes): 194 gpu_id = index 195 cpu_cores_for_run = cpu_cores_per_process[gpu_id] 196 results_queue = result_queues[index] 197 done_event = gpu_worker_done_events[index] 198 p = mp.Process( 199 target=self.detector.gpu_worker, 200 args=( 201 gpu_id, 202 cpu_cores_for_run, 203 task_queue, 204 results_queue, 205 done_event, 206 post_processing_finished, 207 ), 208 ) 209 inference_processes.append(p) 210 p.start() 211 logging.info(f"Started inference worker {index} for GPU {gpu_id}") 212 213 logging.info( 214 f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers." 215 ) 216 217 # Wait for all inference tasks to complete 218 while not all(worker_event.is_set() for worker_event in gpu_worker_done_events): 219 continue 220 logging.info("All workers have finished inference tasks.") 221 inference_finished.value = True 222 223 # Wait for results processing to finish 224 for p in post_processing_processes: 225 p.join() 226 logging.info("Results processing worker has shut down.") 227 post_processing_finished.value = True 228 229 # Wait for workers to finish 230 for p in inference_processes: 231 p.join() 232 logging.info("All inference workers have shut down.") 233 234 # Process updating queue sizes 235 size_updater.terminate() 236 237 # Close queues 238 task_queue.close() 239 results_queue.close() 240 logging.info("All multiprocessing queues are closed.")
Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing.