Loading Data Into Neptune With Spark
For a project it was needed to load a single data mart table from the DWH into AWS Neptune. Below you can find some of the gotchas and solutions that were discovered along the way.
Amazon provides a method for bulk-loading data into Neptune, where the data is uploaded to an S3 bucket and Neptune will load it itself from there. The data should follow one of a few prescribed standards and in this case the Gremlin CSV format was chosen. This format boils down to (a) separate CSV file(s) per node type and per edge type. Notice that you can add data types to the column names, e.g. "firstName:String".
Neptune can conveniently load the separate sub-directories and partitioned CSV files from the S3 location. Although you will see a "load failure" per sub-directory, because of the _SUCCESS file that the Spark writer will leave behind.
Hashing it out
When you need to load a denormalized table, there are roughly two strategies you can follow for populating ~id fields the when you prepare the data into vertices and edges. One is naive, the other is robust:
- Add extra columns for each vertex and edge id and have Spark fill them with monotonically_generated_id(). This might be tempting, because you can more easily fill the ~to and ~from fields of the edges.
- Use hashing based on stable fields. The ~to and ~from fields of the edges can then be filled using the same hash functions based on the input DataFrame that contains the source table.
Option 2 is the more robust approach, because the ids will always be the same, unless the data itself changed. If you are loading live data (i.e. non-historic) that is still subject to change, try to pick columns that are stable, because that avoids the need for doing costly look-ups into Neptune before upserts. Of course, there are other ways around this too, like, for example, getting deltas from the source.
When loading multiple tables with entities you might think of a third option: Using primary keys as ids. Don't do that, as it firstly is a security risk (granted, your Neptune instance is likely still internal, but the cat's out of the bag) and secondly primary keys are also likely to change when migrating or reinserting data.
Changing Commiters
Developing locally it is likely that you see the warning below:
Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
This can be fixed by adding the lines below to spark-defaults.conf. You don't necessarily need HortonWork's spark-cloud artifact mentioned elsewhere.
Config Key | Value |
---|---|
spark.hadoop.fs.s3a.impl | org.apache.hadoop.fs.s3a.S3AFileSystem |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 2 |
spark.hadoop.fs.s3a.committer.name | directory |
spark.hadoop.fs.s3a.committer.magic.enabled | false |
spark.hadoop.fs.s3a.commiter.staging.conflict-mode | replace |
spark.hadoop.fs.s3a.committer.staging.unique-filenames | true |
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads | true |
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a | org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory |
Dependency Hell
If you are here from the far future you can probably ignore this paragraph, but on a side note: The solution was developed for AWS emr-6.2.0, using Spark 3.0.1 built for Hadoop 3.2+, and the right versions on Maven. Yet we still got this Guava mismatch.
py4j.protocol.Py4JJavaError: An error occurred while calling o129.save.
: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, java.lang.Object, java.lang.Object)'
at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:816)
at org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword(S3AUtils.java:792)
at org.apache.hadoop.fs.s3a.S3AUtils.getAWSAccessKeys(S3AUtils.java:747)
at org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.<init>(SimpleAWSCredentialsProvider.java:58)
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:600)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:260)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
After lots of different versions the answer was, of course, found on StackOverflow. And what was needed was to downgrade a patch version, from 3.2.1 to 3.2.0. The final submit command looks something like this:
$ spark-3.0.1-bin-hadoop3.2/bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.11.563,org.apache.hadoop:hadoop-aws:3.2.0 src/python/neptune_loader.py
Finally
Apart from circa 30 minutes of dependency hell, creating this solution was actually pretty straight forward. Once you have the data in S3 you can trigger the loader through a POST request:
curl -X POST
-H 'Content-Type: application/json'
https://your-neptune-endpoint:port/loader -d '
{
"source" : "s3://bucket-name/object-key-name",
"format" : "format",
"iamRoleArn" : "arn:aws:iam::account-id:role/role-name",
"region" : "region",
"failOnError" : "FALSE",
"parallelism" : "MEDIUM",
"updateSingleCardinalityProperties" : "FALSE",
"queueRequest" : "TRUE",
"dependencies" : ["load_A_id", "load_B_id"]
}'