[3.15] gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268) (#151430)

gh-101267: ProcessPoolExecutor no longer shares 1 BrokenProcessPool exception among all failed futures (GH-101268)
(cherry picked from commit 3c00ebc2bbd902495b163def850bc931420209fc)

Co-authored-by: Daniel Shields <daniel.shields@twosigma.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index ed24cc2..10d4ac8 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -469,11 +469,9 @@ def _terminate_broken(self, cause):
             executor._shutdown_thread = True
             executor = None
 
-        # All pending tasks are to be marked failed with the following
-        # BrokenProcessPool error
-        bpe = BrokenProcessPool("A process in the process pool was "
-                                "terminated abruptly while the future was "
-                                "running or pending.")
+        # All pending tasks are to be marked failed with a
+        # BrokenProcessPool error, as separate instances to avoid sharing
+        # a traceback (gh-101267).
         cause_str = None
         if cause is not None:
             cause_str = ''.join(cause)
@@ -489,11 +487,15 @@ def _terminate_broken(self, cause):
                                   f"with exit code {p.exitcode}")
             if errors:
                 cause_str = "\n".join(errors)
-        if cause_str:
-            bpe.__cause__ = _RemoteTraceback(f"\n'''\n{cause_str}'''")
+        cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None
 
         # Mark pending tasks as failed.
         for work_id, work_item in self.pending_work_items.items():
+            bpe = BrokenProcessPool("A process in the process pool was "
+                                    "terminated abruptly while the future was "
+                                    "running or pending.")
+            if cause_tb is not None:
+                bpe.__cause__ = _RemoteTraceback(cause_tb)
             try:
                 work_item.future.set_exception(bpe)
             except _base.InvalidStateError:
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py
index 731419a..56cc389 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -3,6 +3,7 @@
 import sys
 import threading
 import time
+import traceback
 import unittest
 import unittest.mock
 from concurrent import futures
@@ -63,6 +64,32 @@ def test_killed_child(self):
         self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
     @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+    def test_broken_process_pool_traceback(self):
+        # When a child process is abruptly terminated, the whole pool gets
+        # "broken", and a BrokenProcessPool exception should be created
+        # for each future instead of sharing one exception among all futures.
+        event = self.create_event()
+        futures = [self.executor.submit(event.wait) for _ in range(3)]
+        p = next(iter(self.executor._processes.values()))
+        p.terminate()
+        for fut in futures:
+            # Don't use assertRaises(): it clears the traceback off exc.
+            try:
+                fut.result()
+            except BrokenProcessPool as exc:
+                tb = exc.__traceback__
+            else:
+                self.fail("BrokenProcessPool not raised")
+            count = sum(
+                1
+                for frame_summary in traceback.extract_tb(tb)
+                if frame_summary.filename == __file__
+            )
+            # This code file should appear exactly once in the traceback.
+            # A shared exception would accumulate a frame per result() call.
+            self.assertEqual(count, 1)
+
+    @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
     def test_map_chunksize(self):
         def bad_map():
             list(self.executor.map(pow, range(40), range(40), chunksize=-1))
diff --git a/Misc/ACKS b/Misc/ACKS
index 14f0db7..38817b1 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -1762,6 +1762,7 @@
 Bruce Sherwood
 Gregory Shevchenko
 Hai Shi
+Daniel Shields
 Alexander Shigin
 Pete Shinners
 Michael Shiplett
diff --git a/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst
new file mode 100644
index 0000000..901a3fb
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst
@@ -0,0 +1,7 @@
+When a worker process terminates unexpectedly,
+:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate
+:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending
+future instead of sharing a single instance among them all.  Sharing one
+exception produced malformed tracebacks: each
+:meth:`Future.result() <concurrent.futures.Future.result>` call re-raised the
+same object, appending another copy of the traceback to it.