Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43221][CORE] Host local block fetching should use a block status of a block stored on disk #50122

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Mar 1, 2025

Thanks for @yorksity who reported this error and even provided a PR for it.
This solution very different from #40883 as BlockManagerMasterEndpoint#getLocationsAndStatus() needed some refactoring.

What changes were proposed in this pull request?

This PR fixes an error which can be manifested in the following exception:

25/02/20 09:58:31 ERROR util.Utils: [Executor task launch worker for task 61.0 in stage 67.0 (TID 9391)]: Exception encountered
java.lang.ArrayIndexOutOfBoundsException: 0
  at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:185) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2]
  at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) ~[scala-library-2.12.15.jar:?]
  at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?]
  at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:171) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2]                                

The PR is changing BlockManagerMasterEndpoint#getLocationsAndStatus().

The BlockManagerMasterEndpoint#getLocationsAndStatus() function is giving back an optional BlockLocationsAndStatus which consist of 3 parts:

  • locations: all the locations where the block can be found (as a sequence of block manager IDs)
  • status: one block status
  • localDirs: optional directory paths which can be used to read block if the block is found in the disk of an executor running on the same host

The block (either RDD blocks, shuffle blocks or torrent blocks) can be stored in many executors with different storage levels: disk or memory.

This PR changing how the block status and the block manager ID for the localDirs is found to guarantee they belong together.

Why are the changes needed?

Before this PR the BlockManagerMasterEndpoint#getLocationsAndStatus() was searching for the block status (status) and the localDirs separately. The block status actually was computed as the very first one where the block can be found. This way it can easily happen this block status was representing an in-memory block (where the disk size is 0 as it is stored in the memory) but the localDirs was filled out based on a host local block instance which was stored on disk.

This situation can be very frequent but only causing problems (exceptions as above) when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted block the whole file containing the block is read, see
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Host local block fetching was already covered by some existing unit tests but a new unit test is provided for this exact case: "SPARK-43221: Host local block fetching should use a block status with disk size".

The number of block mangers and the order of the blocks was chosen after some experimentation as the block status order is depends on a HashSet, see:

  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

This test was executed with the old code too to validate the issue is reproduced:

BlockManagerSuite:
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
- SPARK-43221: Host local block fetching should use a block status with disk size *** FAILED ***
  0 was not greater than 0 The block size must be greater than 0 for a nonempty block! (BlockManagerSuite.scala:491)
Run completed in 6 seconds, 705 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Mar 1, 2025
@attilapiros attilapiros force-pushed the SPARK-43221 branch 2 times, most recently from 695e1c4 to af8d474 Compare March 1, 2025 16:42
@attilapiros attilapiros changed the title [WIP][SPARK-43221][CORE] Host local block fetching should use a block status of a block stored on disk [SPARK-43221][CORE] Host local block fetching should use a block status of a block stored on disk Mar 2, 2025
@attilapiros
Copy link
Contributor Author

cc @mridulm, @Ngone51

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant