Skip to content

Commit

Permalink
Fixes #1
Browse files Browse the repository at this point in the history
- Combined reducer classes for Job 4 & 5
  • Loading branch information
samujjwaal committed Jan 11, 2021
1 parent 8d3611f commit f73f162
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 63 deletions.
10 changes: 7 additions & 3 deletions src/main/scala/com/samujjwaal/hw2/RunJobs.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.samujjwaal.hw2

import com.samujjwaal.hw2.mappers.{CoAuthorCountMapper, VenueOneAuthorMapper, VenueTopPubMapper, VenueTopTenAuthorsMapper}
import com.samujjwaal.hw2.mappers._
import com.samujjwaal.hw2.reducers._
import com.samujjwaal.hw2.util.XmlInputFormatWithMultipleTags
import com.typesafe.config.{Config, ConfigFactory}
Expand Down Expand Up @@ -87,13 +87,15 @@ object RunJobs {
}

if (args(0) == "4") {
// for selecting reducer cleanup operation
configuration.set("jobNo",args(0))
val authorCount1 = Job.getInstance(configuration, "List of top 100 authors who publish with most co-authors(in desc. order)")
authorCount1.setJarByClass(this.getClass)
//Setting mapper
authorCount1.setMapperClass(classOf[CoAuthorCountMapper])
authorCount1.setInputFormatClass(classOf[XmlInputFormatWithMultipleTags])
//setting reducer
authorCount1.setReducerClass(classOf[MostCoAuthorCountReducer])
authorCount1.setReducerClass(classOf[CoAuthorCountReducer])
authorCount1.setMapOutputKeyClass(classOf[Text])
authorCount1.setMapOutputValueClass(classOf[IntWritable])
authorCount1.setOutputKeyClass(classOf[Text])
Expand All @@ -105,13 +107,15 @@ object RunJobs {
}

if (args(0) == "5") {
// for selecting reducer cleanup operation
configuration.set("jobNo",args(0))
val authorCount2 = Job.getInstance(configuration, "List of 100 authors who publish without co-authors")
authorCount2.setJarByClass(this.getClass)
//Setting mapper
authorCount2.setMapperClass(classOf[CoAuthorCountMapper])
authorCount2.setInputFormatClass(classOf[XmlInputFormatWithMultipleTags])
//setting reducer
authorCount2.setReducerClass(classOf[ZeroCoAuthorCountReducer])
authorCount2.setReducerClass(classOf[CoAuthorCountReducer])
authorCount2.setMapOutputKeyClass(classOf[Text])
authorCount2.setMapOutputValueClass(classOf[IntWritable])
authorCount2.setOutputKeyClass(classOf[Text])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.samujjwaal.hw2.reducers

import java.lang

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Reducer
import org.slf4j.{Logger, LoggerFactory}
Expand All @@ -10,9 +9,9 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters.IterableHasAsScala

/**
* Reducer class to calculate the maximum number of co-authors for each author and return top 100 authors
* Reducer class to calculate the number of co-authors for each author
*/
class MostCoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] {
class CoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] {

val logger: Logger = LoggerFactory.getLogger(this.getClass)
// hashmap to store author name and max coauthor count to sort at end of reduce task
Expand All @@ -35,14 +34,35 @@ class MostCoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] {
*/
override def cleanup(context: Reducer[Text, IntWritable, Text, Text]#Context): Unit = {

// sort hashmap in descending order by coauthor count of each author and select top 100
val sortedMap = mutable.LinkedHashMap(map.toSeq.sortWith(_._2 > _._2): _*).take(100)
val outputFlag = context.getConfiguration.get("jobNo")

logger.info("Authors: {}", sortedMap.keys)
sortedMap.foreach(record => {
context.write(new Text(record._1), new Text(record._2.toString))
})
// reducer outputs key:<author name> & value:<max. number of coauthors>
}
// find top 100 authors who have published with the maximum number of co-authors
if (outputFlag == "4"){

// sort hashmap in descending order by coauthor count of each author and select top 100
val sortedMap = mutable.LinkedHashMap(map.toSeq.sortWith(_._2 > _._2): _*).take(100)

logger.info("Authors: {}", sortedMap.keys)
sortedMap.foreach(record => {
context.write(new Text(record._1), new Text(record._2.toString))
})

// reducer outputs key:<author name> & value:<max. number of coauthors>
}

// find 100 authors who have published with 0 co-authors
if (outputFlag == "5"){

// filter hashmap by coauthor count equals 0 and select 100
val outputMap = map.filter(_._2 == 0).take(100)

logger.info("Authors: {}", outputMap.keys)
outputMap.foreach(record => {
context.write(new Text(record._1), new Text(record._2.toString))
})

// reducer outputs key:<author name> & value:<0>
}

}
}

This file was deleted.

0 comments on commit f73f162

Please sign in to comment.