utils.mp_distribution

  1import logging
  2import subprocess
  3
  4import psutil
  5import torch
  6import torch.multiprocessing as mp
  7
  8
  9class Distributer:
 10    """A utility class for managing CPU and GPU resource distribution across multiple processes."""
 11
 12    def __init__(self):
 13        """Initialize the multiprocessing distribution manager with CPU and GPU information."""
 14        self.cpu_cores = psutil.Process().cpu_affinity()
 15        self.n_cpu_cores = len(self.cpu_cores)
 16        self.n_gpus = torch.cuda.device_count()
 17
 18        logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}")
 19
 20        # Check if GPUs are in necessary default compute mode (shared):
 21        gpu_modes = self.get_gpu_compute_modes()
 22        for i, mode in enumerate(gpu_modes):
 23            if not "default" in mode.lower():
 24                logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.")
 25
 26    def get_gpu_compute_modes(self):
 27        """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command."""
 28        try:
 29            result = subprocess.run(
 30                ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"],
 31                capture_output=True,
 32                text=True,
 33                check=True,
 34            )
 35            modes = result.stdout.strip().split("\n")
 36            return modes
 37        except subprocess.CalledProcessError as e:
 38            logging.error("Error running nvidia-smi:", e)
 39            return []
 40
 41    def distribute_cpu_cores(self, cpu_cores, n_processes):
 42        """Distributes a list of CPU cores among a specified number of processes."""
 43        n_cores = len(cpu_cores)
 44
 45        chunk_size = n_cores // n_processes
 46        remainder = n_cores % n_processes
 47
 48        cpu_cores_per_process = []
 49        start = 0
 50        for i in range(n_processes):
 51            # Determine the end index for this chunk
 52            end = start + chunk_size + (1 if i < remainder else 0)
 53            cpu_cores_per_process.append(cpu_cores[start:end])
 54            start = end
 55
 56        return cpu_cores_per_process
 57
 58
 59class ZeroShotDistributer(Distributer):
 60    """A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing."""
 61
 62    def __init__(self, config, n_samples, dataset_info, detector):
 63        """Initializes detector runner with configuration, samples count, dataset information and detector."""
 64        super().__init__()  # Call the parent class's __init__ method
 65        self.config = config
 66        self.n_samples = n_samples
 67        self.dataset_info = dataset_info
 68        self.detector = detector
 69
 70    def distribute_and_run(self):
 71        """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing."""
 72        dataset_name = self.dataset_info["name"]
 73        models_dict = self.config["hf_models_zeroshot_objectdetection"]
 74        n_post_worker = self.config["n_post_processing_worker_per_inference_worker"]
 75
 76        runs = []
 77        run_id = 0
 78
 79        for model_name in models_dict:
 80            batch_size = models_dict[model_name]["batch_size"]
 81            n_chunks = models_dict[model_name]["n_dataset_chunks"]
 82
 83            # Calculate the base split size and leftover samples
 84            chunk_size, leftover_samples = divmod(self.n_samples, n_chunks)
 85
 86            chunk_index_start = 0
 87            chunk_index_end = None
 88            for split_id in range(n_chunks):
 89
 90                # Prepare torch subsets
 91                if n_chunks == 1:
 92                    is_subset = False
 93                else:
 94                    is_subset = True
 95                    chunk_size += leftover_samples if split_id == n_chunks - 1 else 0
 96                    chunk_index_end = chunk_index_start + chunk_size
 97
 98                # Add entry to runs
 99                runs.append(
100                    {
101                        "run_id": run_id,
102                        "model_name": model_name,
103                        "is_subset": is_subset,
104                        "chunk_index_start": chunk_index_start,
105                        "chunk_index_end": chunk_index_end,
106                        "batch_size": batch_size,
107                        "dataset_name": dataset_name,
108                    }
109                )
110
111                # Update start index for next chunk
112                if n_chunks > 1:
113                    chunk_index_start += chunk_size
114
115                run_id += 1
116
117        n_runs = len(runs)
118
119        logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.")
120        n_parallel_processes = min(self.n_gpus, n_runs)
121
122        n_post_processing_workers = int(n_post_worker * n_parallel_processes)
123        n_total_workers = n_parallel_processes + n_post_processing_workers
124
125        # Create results queues, max one per GPU
126        result_queues = []
127        max_queue_size = 3  # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing)
128        for index in range(n_parallel_processes):
129            results_queue = mp.Queue(
130                maxsize=max_queue_size
131            )  # Ensure that no GPU memory overflow occurs
132            result_queues.append(results_queue)
133
134        # Dedicated worker that continuously measures the lengths of the result queues
135        result_queues_sizes = mp.Manager().list([0] * n_parallel_processes)
136        largest_queue_index = mp.Value("i", -1)
137        size_updater = mp.Process(
138            target=self.detector.update_queue_sizes_worker,
139            args=(
140                result_queues,
141                result_queues_sizes,
142                largest_queue_index,
143                max_queue_size,
144            ),
145        )
146        size_updater.start()
147
148        # Create queue with all planned runs
149        task_queue = mp.Queue()
150        for run_id, run_metadata in enumerate(runs):
151            logging.info(f"Run {run_id} Metadata: {run_metadata}")
152            task_queue.put(run_metadata)
153
154        # Flags for synchronizations
155        inference_finished = mp.Value("b", False)
156        post_processing_finished = mp.Value("b", False)
157
158        # Distribute CPU cores
159        if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores):
160            logging.error(
161                f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores."
162            )
163
164        cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers]
165        cpu_cores_inference = self.cpu_cores[n_post_processing_workers:]
166        cpu_cores_per_process = self.distribute_cpu_cores(
167            cpu_cores_inference, n_parallel_processes
168        )
169
170        # Create post-processing worker processes
171        post_processing_processes = []
172        for i in range(n_post_processing_workers):
173            p = mp.Process(
174                target=self.detector.process_outputs_worker,
175                args=(
176                    result_queues,
177                    largest_queue_index,
178                    inference_finished,
179                    max_queue_size,
180                ),
181            )
182            post_processing_processes.append(p)
183            p.start()
184            logging.info(f"Started post-processing worker {index}")
185
186        # Create worker processes, max one per GPU
187        inference_processes = []
188        gpu_worker_done_events = [
189            mp.Event() for _ in range(n_parallel_processes)
190        ]  # Signals when a GPU worker is done
191
192        for index in range(n_parallel_processes):
193            gpu_id = index
194            cpu_cores_for_run = cpu_cores_per_process[gpu_id]
195            results_queue = result_queues[index]
196            done_event = gpu_worker_done_events[index]
197            p = mp.Process(
198                target=self.detector.gpu_worker,
199                args=(
200                    gpu_id,
201                    cpu_cores_for_run,
202                    task_queue,
203                    results_queue,
204                    done_event,
205                    post_processing_finished,
206                ),
207            )
208            inference_processes.append(p)
209            p.start()
210            logging.info(f"Started inference worker {index} for GPU {gpu_id}")
211
212        logging.info(
213            f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers."
214        )
215
216        # Wait for all inference tasks to complete
217        while not all(worker_event.is_set() for worker_event in gpu_worker_done_events):
218            continue
219        logging.info("All workers have finished inference tasks.")
220        inference_finished.value = True
221
222        # Wait for results processing to finish
223        for p in post_processing_processes:
224            p.join()
225        logging.info("Results processing worker has shut down.")
226        post_processing_finished.value = True
227
228        # Wait for workers to finish
229        for p in inference_processes:
230            p.join()
231        logging.info("All inference workers have shut down.")
232
233        # Process updating queue sizes
234        size_updater.terminate()
235
236        # Close queues
237        task_queue.close()
238        results_queue.close()
239        logging.info("All multiprocessing queues are closed.")
class Distributer:
10class Distributer:
11    """A utility class for managing CPU and GPU resource distribution across multiple processes."""
12
13    def __init__(self):
14        """Initialize the multiprocessing distribution manager with CPU and GPU information."""
15        self.cpu_cores = psutil.Process().cpu_affinity()
16        self.n_cpu_cores = len(self.cpu_cores)
17        self.n_gpus = torch.cuda.device_count()
18
19        logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}")
20
21        # Check if GPUs are in necessary default compute mode (shared):
22        gpu_modes = self.get_gpu_compute_modes()
23        for i, mode in enumerate(gpu_modes):
24            if not "default" in mode.lower():
25                logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.")
26
27    def get_gpu_compute_modes(self):
28        """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command."""
29        try:
30            result = subprocess.run(
31                ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"],
32                capture_output=True,
33                text=True,
34                check=True,
35            )
36            modes = result.stdout.strip().split("\n")
37            return modes
38        except subprocess.CalledProcessError as e:
39            logging.error("Error running nvidia-smi:", e)
40            return []
41
42    def distribute_cpu_cores(self, cpu_cores, n_processes):
43        """Distributes a list of CPU cores among a specified number of processes."""
44        n_cores = len(cpu_cores)
45
46        chunk_size = n_cores // n_processes
47        remainder = n_cores % n_processes
48
49        cpu_cores_per_process = []
50        start = 0
51        for i in range(n_processes):
52            # Determine the end index for this chunk
53            end = start + chunk_size + (1 if i < remainder else 0)
54            cpu_cores_per_process.append(cpu_cores[start:end])
55            start = end
56
57        return cpu_cores_per_process

A utility class for managing CPU and GPU resource distribution across multiple processes.

Distributer()
13    def __init__(self):
14        """Initialize the multiprocessing distribution manager with CPU and GPU information."""
15        self.cpu_cores = psutil.Process().cpu_affinity()
16        self.n_cpu_cores = len(self.cpu_cores)
17        self.n_gpus = torch.cuda.device_count()
18
19        logging.info(f"CPU cores: {self.n_cpu_cores}. GPU count: {self.n_gpus}")
20
21        # Check if GPUs are in necessary default compute mode (shared):
22        gpu_modes = self.get_gpu_compute_modes()
23        for i, mode in enumerate(gpu_modes):
24            if not "default" in mode.lower():
25                logging.error(f"GPU {i} is in {mode}. Needs to be 'Default'.")

Initialize the multiprocessing distribution manager with CPU and GPU information.

cpu_cores
n_cpu_cores
n_gpus
def get_gpu_compute_modes(self):
27    def get_gpu_compute_modes(self):
28        """Retrieves the compute modes of all available GPUs using the `nvidia-smi` command."""
29        try:
30            result = subprocess.run(
31                ["nvidia-smi", "--query-gpu=compute_mode", "--format=csv,noheader"],
32                capture_output=True,
33                text=True,
34                check=True,
35            )
36            modes = result.stdout.strip().split("\n")
37            return modes
38        except subprocess.CalledProcessError as e:
39            logging.error("Error running nvidia-smi:", e)
40            return []

Retrieves the compute modes of all available GPUs using the nvidia-smi command.

def distribute_cpu_cores(self, cpu_cores, n_processes):
42    def distribute_cpu_cores(self, cpu_cores, n_processes):
43        """Distributes a list of CPU cores among a specified number of processes."""
44        n_cores = len(cpu_cores)
45
46        chunk_size = n_cores // n_processes
47        remainder = n_cores % n_processes
48
49        cpu_cores_per_process = []
50        start = 0
51        for i in range(n_processes):
52            # Determine the end index for this chunk
53            end = start + chunk_size + (1 if i < remainder else 0)
54            cpu_cores_per_process.append(cpu_cores[start:end])
55            start = end
56
57        return cpu_cores_per_process

Distributes a list of CPU cores among a specified number of processes.

class ZeroShotDistributer(Distributer):
 60class ZeroShotDistributer(Distributer):
 61    """A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing."""
 62
 63    def __init__(self, config, n_samples, dataset_info, detector):
 64        """Initializes detector runner with configuration, samples count, dataset information and detector."""
 65        super().__init__()  # Call the parent class's __init__ method
 66        self.config = config
 67        self.n_samples = n_samples
 68        self.dataset_info = dataset_info
 69        self.detector = detector
 70
 71    def distribute_and_run(self):
 72        """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing."""
 73        dataset_name = self.dataset_info["name"]
 74        models_dict = self.config["hf_models_zeroshot_objectdetection"]
 75        n_post_worker = self.config["n_post_processing_worker_per_inference_worker"]
 76
 77        runs = []
 78        run_id = 0
 79
 80        for model_name in models_dict:
 81            batch_size = models_dict[model_name]["batch_size"]
 82            n_chunks = models_dict[model_name]["n_dataset_chunks"]
 83
 84            # Calculate the base split size and leftover samples
 85            chunk_size, leftover_samples = divmod(self.n_samples, n_chunks)
 86
 87            chunk_index_start = 0
 88            chunk_index_end = None
 89            for split_id in range(n_chunks):
 90
 91                # Prepare torch subsets
 92                if n_chunks == 1:
 93                    is_subset = False
 94                else:
 95                    is_subset = True
 96                    chunk_size += leftover_samples if split_id == n_chunks - 1 else 0
 97                    chunk_index_end = chunk_index_start + chunk_size
 98
 99                # Add entry to runs
100                runs.append(
101                    {
102                        "run_id": run_id,
103                        "model_name": model_name,
104                        "is_subset": is_subset,
105                        "chunk_index_start": chunk_index_start,
106                        "chunk_index_end": chunk_index_end,
107                        "batch_size": batch_size,
108                        "dataset_name": dataset_name,
109                    }
110                )
111
112                # Update start index for next chunk
113                if n_chunks > 1:
114                    chunk_index_start += chunk_size
115
116                run_id += 1
117
118        n_runs = len(runs)
119
120        logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.")
121        n_parallel_processes = min(self.n_gpus, n_runs)
122
123        n_post_processing_workers = int(n_post_worker * n_parallel_processes)
124        n_total_workers = n_parallel_processes + n_post_processing_workers
125
126        # Create results queues, max one per GPU
127        result_queues = []
128        max_queue_size = 3  # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing)
129        for index in range(n_parallel_processes):
130            results_queue = mp.Queue(
131                maxsize=max_queue_size
132            )  # Ensure that no GPU memory overflow occurs
133            result_queues.append(results_queue)
134
135        # Dedicated worker that continuously measures the lengths of the result queues
136        result_queues_sizes = mp.Manager().list([0] * n_parallel_processes)
137        largest_queue_index = mp.Value("i", -1)
138        size_updater = mp.Process(
139            target=self.detector.update_queue_sizes_worker,
140            args=(
141                result_queues,
142                result_queues_sizes,
143                largest_queue_index,
144                max_queue_size,
145            ),
146        )
147        size_updater.start()
148
149        # Create queue with all planned runs
150        task_queue = mp.Queue()
151        for run_id, run_metadata in enumerate(runs):
152            logging.info(f"Run {run_id} Metadata: {run_metadata}")
153            task_queue.put(run_metadata)
154
155        # Flags for synchronizations
156        inference_finished = mp.Value("b", False)
157        post_processing_finished = mp.Value("b", False)
158
159        # Distribute CPU cores
160        if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores):
161            logging.error(
162                f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores."
163            )
164
165        cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers]
166        cpu_cores_inference = self.cpu_cores[n_post_processing_workers:]
167        cpu_cores_per_process = self.distribute_cpu_cores(
168            cpu_cores_inference, n_parallel_processes
169        )
170
171        # Create post-processing worker processes
172        post_processing_processes = []
173        for i in range(n_post_processing_workers):
174            p = mp.Process(
175                target=self.detector.process_outputs_worker,
176                args=(
177                    result_queues,
178                    largest_queue_index,
179                    inference_finished,
180                    max_queue_size,
181                ),
182            )
183            post_processing_processes.append(p)
184            p.start()
185            logging.info(f"Started post-processing worker {index}")
186
187        # Create worker processes, max one per GPU
188        inference_processes = []
189        gpu_worker_done_events = [
190            mp.Event() for _ in range(n_parallel_processes)
191        ]  # Signals when a GPU worker is done
192
193        for index in range(n_parallel_processes):
194            gpu_id = index
195            cpu_cores_for_run = cpu_cores_per_process[gpu_id]
196            results_queue = result_queues[index]
197            done_event = gpu_worker_done_events[index]
198            p = mp.Process(
199                target=self.detector.gpu_worker,
200                args=(
201                    gpu_id,
202                    cpu_cores_for_run,
203                    task_queue,
204                    results_queue,
205                    done_event,
206                    post_processing_finished,
207                ),
208            )
209            inference_processes.append(p)
210            p.start()
211            logging.info(f"Started inference worker {index} for GPU {gpu_id}")
212
213        logging.info(
214            f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers."
215        )
216
217        # Wait for all inference tasks to complete
218        while not all(worker_event.is_set() for worker_event in gpu_worker_done_events):
219            continue
220        logging.info("All workers have finished inference tasks.")
221        inference_finished.value = True
222
223        # Wait for results processing to finish
224        for p in post_processing_processes:
225            p.join()
226        logging.info("Results processing worker has shut down.")
227        post_processing_finished.value = True
228
229        # Wait for workers to finish
230        for p in inference_processes:
231            p.join()
232        logging.info("All inference workers have shut down.")
233
234        # Process updating queue sizes
235        size_updater.terminate()
236
237        # Close queues
238        task_queue.close()
239        results_queue.close()
240        logging.info("All multiprocessing queues are closed.")

A distributer class that handles zero-shot object detection tasks across multiple GPUs using multiprocessing.

ZeroShotDistributer(config, n_samples, dataset_info, detector)
63    def __init__(self, config, n_samples, dataset_info, detector):
64        """Initializes detector runner with configuration, samples count, dataset information and detector."""
65        super().__init__()  # Call the parent class's __init__ method
66        self.config = config
67        self.n_samples = n_samples
68        self.dataset_info = dataset_info
69        self.detector = detector

Initializes detector runner with configuration, samples count, dataset information and detector.

config
n_samples
dataset_info
detector
def distribute_and_run(self):
 71    def distribute_and_run(self):
 72        """Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing."""
 73        dataset_name = self.dataset_info["name"]
 74        models_dict = self.config["hf_models_zeroshot_objectdetection"]
 75        n_post_worker = self.config["n_post_processing_worker_per_inference_worker"]
 76
 77        runs = []
 78        run_id = 0
 79
 80        for model_name in models_dict:
 81            batch_size = models_dict[model_name]["batch_size"]
 82            n_chunks = models_dict[model_name]["n_dataset_chunks"]
 83
 84            # Calculate the base split size and leftover samples
 85            chunk_size, leftover_samples = divmod(self.n_samples, n_chunks)
 86
 87            chunk_index_start = 0
 88            chunk_index_end = None
 89            for split_id in range(n_chunks):
 90
 91                # Prepare torch subsets
 92                if n_chunks == 1:
 93                    is_subset = False
 94                else:
 95                    is_subset = True
 96                    chunk_size += leftover_samples if split_id == n_chunks - 1 else 0
 97                    chunk_index_end = chunk_index_start + chunk_size
 98
 99                # Add entry to runs
100                runs.append(
101                    {
102                        "run_id": run_id,
103                        "model_name": model_name,
104                        "is_subset": is_subset,
105                        "chunk_index_start": chunk_index_start,
106                        "chunk_index_end": chunk_index_end,
107                        "batch_size": batch_size,
108                        "dataset_name": dataset_name,
109                    }
110                )
111
112                # Update start index for next chunk
113                if n_chunks > 1:
114                    chunk_index_start += chunk_size
115
116                run_id += 1
117
118        n_runs = len(runs)
119
120        logging.info(f"Running with multiprocessing on {self.n_gpus} GPUs.")
121        n_parallel_processes = min(self.n_gpus, n_runs)
122
123        n_post_processing_workers = int(n_post_worker * n_parallel_processes)
124        n_total_workers = n_parallel_processes + n_post_processing_workers
125
126        # Create results queues, max one per GPU
127        result_queues = []
128        max_queue_size = 3  # Balance between flexibility and available GPU memory for inference (data stays on GPU for post-processing)
129        for index in range(n_parallel_processes):
130            results_queue = mp.Queue(
131                maxsize=max_queue_size
132            )  # Ensure that no GPU memory overflow occurs
133            result_queues.append(results_queue)
134
135        # Dedicated worker that continuously measures the lengths of the result queues
136        result_queues_sizes = mp.Manager().list([0] * n_parallel_processes)
137        largest_queue_index = mp.Value("i", -1)
138        size_updater = mp.Process(
139            target=self.detector.update_queue_sizes_worker,
140            args=(
141                result_queues,
142                result_queues_sizes,
143                largest_queue_index,
144                max_queue_size,
145            ),
146        )
147        size_updater.start()
148
149        # Create queue with all planned runs
150        task_queue = mp.Queue()
151        for run_id, run_metadata in enumerate(runs):
152            logging.info(f"Run {run_id} Metadata: {run_metadata}")
153            task_queue.put(run_metadata)
154
155        # Flags for synchronizations
156        inference_finished = mp.Value("b", False)
157        post_processing_finished = mp.Value("b", False)
158
159        # Distribute CPU cores
160        if (n_parallel_processes + n_post_processing_workers) > len(self.cpu_cores):
161            logging.error(
162                f"Launching {n_parallel_processes + n_post_processing_workers} processes with only {len(self.cpu_cores)} CPU cores."
163            )
164
165        cpu_cores_post_processing = self.cpu_cores[:n_post_processing_workers]
166        cpu_cores_inference = self.cpu_cores[n_post_processing_workers:]
167        cpu_cores_per_process = self.distribute_cpu_cores(
168            cpu_cores_inference, n_parallel_processes
169        )
170
171        # Create post-processing worker processes
172        post_processing_processes = []
173        for i in range(n_post_processing_workers):
174            p = mp.Process(
175                target=self.detector.process_outputs_worker,
176                args=(
177                    result_queues,
178                    largest_queue_index,
179                    inference_finished,
180                    max_queue_size,
181                ),
182            )
183            post_processing_processes.append(p)
184            p.start()
185            logging.info(f"Started post-processing worker {index}")
186
187        # Create worker processes, max one per GPU
188        inference_processes = []
189        gpu_worker_done_events = [
190            mp.Event() for _ in range(n_parallel_processes)
191        ]  # Signals when a GPU worker is done
192
193        for index in range(n_parallel_processes):
194            gpu_id = index
195            cpu_cores_for_run = cpu_cores_per_process[gpu_id]
196            results_queue = result_queues[index]
197            done_event = gpu_worker_done_events[index]
198            p = mp.Process(
199                target=self.detector.gpu_worker,
200                args=(
201                    gpu_id,
202                    cpu_cores_for_run,
203                    task_queue,
204                    results_queue,
205                    done_event,
206                    post_processing_finished,
207                ),
208            )
209            inference_processes.append(p)
210            p.start()
211            logging.info(f"Started inference worker {index} for GPU {gpu_id}")
212
213        logging.info(
214            f"Started {len(post_processing_processes)} post-processing workers and {len(inference_processes)} GPU inference workers."
215        )
216
217        # Wait for all inference tasks to complete
218        while not all(worker_event.is_set() for worker_event in gpu_worker_done_events):
219            continue
220        logging.info("All workers have finished inference tasks.")
221        inference_finished.value = True
222
223        # Wait for results processing to finish
224        for p in post_processing_processes:
225            p.join()
226        logging.info("Results processing worker has shut down.")
227        post_processing_finished.value = True
228
229        # Wait for workers to finish
230        for p in inference_processes:
231            p.join()
232        logging.info("All inference workers have shut down.")
233
234        # Process updating queue sizes
235        size_updater.terminate()
236
237        # Close queues
238        task_queue.close()
239        results_queue.close()
240        logging.info("All multiprocessing queues are closed.")

Distributes inference tasks across multiple GPUs and manages post-processing workers with queues for efficient parallel processing.