In the last post, , we explained how we could use SQL to query our data stored within Hadoop. Our engine is capable of reading files from a distributed file system, auto discovering the schema from the files and exposing them as tables through the meta store. All this was done to be able to connect standard SQL clients to our engine and explore our data set without manually define the schema of our files, avoiding ETL work. Apache Spark as a Distributed SQL Engine CSV Hive Spark provides a framework that can be extended and we will push its capabilities even further by extending some of its functionalities. Spark Data Source API The allows us to manage structured data in any format. Spark already has some standard structures built in such as Avro and Parquet, yet third parties have created new for and others by extending this API. Today we are going to create our own. Data Source API readers CSV, JSON We have two reasons to extend the API. First, we want a library that is capable of reading our format and transform our current data source into a new one that is easier to use. legacy Second, we want to share this library across all our applications that use our data avoiding complex packaging of applications that need to be shared in order to achieve the same goal. The Data Source Our data source consists in a collection of files where each file is an entity by itself. For the sake of this example, we have defined a simple format where each file is a text file containing the information of a user, each field by line. Let’s see an example of a file. pepe20MiamiCuba This file represent a user called ‘pepe’ who is 20 years old, lives in Miami and was born in Cuba. In the real world, the format can be as complicated as we want, but the process we are going to explain will not change. Each file has the same format and we have millions of them. We also want to expose them to be queried in SQL. Our Implementation In order to extend the , we need to implement certain classes from the Spark framework, so our custom reader can be loaded and used. Data Source API Let’s start by creating a Spark application as the entry point to our example. We can do this by following the post . SBT, Scala and Spark The first thing we need to do once the app has been created is to link the correct Spark libraries. We are going to be running the examples on Spark and our file is defined as follow. 1.5.1 sbt Creating Our Schema The starting extension point of the is the class. The class will be used to create the necessary relations of our data. Data Source API RelationProvider RelationProvider We also need to mix the trait, which allows us to create the schema that we want. SchemaRelationProvider We need to create a class named and Spark will look for it in a given package. The class will and mix DefaultSource DefaultSource extend RelationProvider SchemaRelationProvider Our code so far looks as follow: In the code, we are basically creating a object, which defined the we want to create. Think about a relation like a collection of tuples with a known schema. LegacyRelation Relation Let’s see how our class is implemented. Relation Here we are the function so it returns the schema we want. In this example, we know the schema of our data, but in here, we could do anything we want to obtain the required schema. If the data were , we could infer the schema using the headers of the file or do any other operations we need. overriding schema CSV Notice that we only want the and fields instead of the entire content of our entities. name age The next step is to test that we are getting the correct schema and we can do this by adding the following code to our . app This code creates a and an from it. Using the we set the format by passing the (Remember Spark will look at this package for the class). Then we the data in the specified path using our provider into a . SparkContext SQLContext SQLContext package name DefaultSource load DataFrame will print the schema we defined and the output should look as follow. At this point, we only have created the schema we want, but there is nothing that says how to ready the data and how to structure it into our defined schema. Reading Data Into Our Schema In order to read from our data source, our class needs to mix the trait. has a method we need to implemented with the following signature: LegacyRelation TableScan TableScan The method should return all rows from our data source. In our particular case, each row will be the selected content of each file. Let’s take a look at our implementation of the . buildScan buildScan Here we are using the method that reads the entire file (each file is an entity), reads the first two lines (the only fields we want) and creates a row from each of them. The result is a collection of rows where each row is created using only the part of the file we care about. wholeTextFiles This will be enough to modify our so it prints out the content of our data source. The now looks as follow. app app Even though we are reading the desired format into a data frame, there is no information about the field types of our data. Our schema is definition supports different data types, yet we are not enforcing them. Let’s modify our method so it infers the type information when creating each row. buildScan Here, the only change is that we are casting each value read from our files into its correct type, inferred from the object. In our particular case we are only interested that is a and an , but again, we could be very creative at this point. schema.fields name String age Integer Now, our final class will look as follow. LegacyRelation Now we can load our data into a and register it to be used by SQL clients as we explain in our previous post. Our is as simple as we show. DataFrame app We have shown enough to read a custom format into a data frame so we can take advantages from the , yet more can be done. DataFrame API The not only offers functionalities for reading data, but also to write it in a custom format. This functionality is very powerful if we want to transform a data set from a format to another one. Let’s see how we add these capabilities to our existing driver. Data Source API Writing a Formatter Let’s suppose we want to save our data so it can be read from other standard systems. We are going to load our custom data source and create a like output from it. CSV In order to support from the API, our class has to mix with the trait. This trait has a method called we need to implement, let’s take a look at it. save calls DefaultSource CreatableRelationProvider createRelation We are basically saving our data frame as a like file and then returning a relation with a known schema. CSV The method is creating a with our data formatted as , then it saves it to the given path. For simplicity we did not include the headers in our output files, but remember we can do whatever we need to output the data in the format we require. saveAsCsvFile RDD[String] CSV The entire code of our class is the following. DefaultSource In order to save our original data as like format, we modify our as follow. CSV app Note that every time we read/write our data, we need to specify the package name where our class is located. DefaultSource We now can package our library and include it in any project we need to use the data source we described. Many other libraries are being created to support all possible formats we can imagine and now you can create your own to contribute to the community or just to be used in your own projects. Endings We have seen how to load data from a custom format into data frames using the Spark . We also reviewed the classes involved in the process, especially how Spark uses our from our package to perform the required operations. We also implemented an output formatter so our data frames can be saved, as we like to. Data Source API DefaultSource There is much more we can do with the , but finding the right documentation has been quite hard in my experience. I believe that a better documentation could be created, specifically for those parts of the API that are very useful when extending them. Data Source API Even though our example shows how to extend the to support a simple format, it can be modified to read and write more complex types such as binary encoded entities. Data Source API The ability to integrate our own data types into Spark makes it one of the top frameworks for data processing out there. In the Hadoop world we can find a lot of tools that share goals and functionalities, but none of them is as flexible and versatile as Spark. This makes Spark very desirable in this field. If we are interested in a processing framework capable of work under limitless circumstances, then Apache Spark is the way to go. If you find this post useful, please recommend it so others can benefit from it. Read next: Apache Spark as a Distributed SQL Engine How MapR improves our productivity and simplifies our design. is how hackers start their afternoons. We’re a part of the family. We are now and happy to opportunities. Hacker Noon @AMI accepting submissions discuss advertising &sponsorship To learn more, , , or simply, read our about page like/message us on Facebook tweet/DM @HackerNoon. If you enjoyed this story, we recommend reading our and . Until next time, don’t take the realities of the world for granted! latest tech stories trending tech stories
Share Your Thoughts