From f73f1627d4fbed5deaaf56340d53976e20451551 Mon Sep 17 00:00:00 2001 From: Samujjwaal Dey Date: Mon, 11 Jan 2021 12:58:02 -0600 Subject: [PATCH] Fixes #1 - Combined reducer classes for Job 4 & 5 --- .../scala/com/samujjwaal/hw2/RunJobs.scala | 10 ++-- ...ducer.scala => CoAuthorCountReducer.scala} | 42 +++++++++++----- .../reducers/ZeroCoAuthorCountReducer.scala | 49 ------------------- 3 files changed, 38 insertions(+), 63 deletions(-) rename src/main/scala/com/samujjwaal/hw2/reducers/{MostCoAuthorCountReducer.scala => CoAuthorCountReducer.scala} (50%) delete mode 100644 src/main/scala/com/samujjwaal/hw2/reducers/ZeroCoAuthorCountReducer.scala diff --git a/src/main/scala/com/samujjwaal/hw2/RunJobs.scala b/src/main/scala/com/samujjwaal/hw2/RunJobs.scala index f721178..c2da4d3 100644 --- a/src/main/scala/com/samujjwaal/hw2/RunJobs.scala +++ b/src/main/scala/com/samujjwaal/hw2/RunJobs.scala @@ -1,6 +1,6 @@ package com.samujjwaal.hw2 -import com.samujjwaal.hw2.mappers.{CoAuthorCountMapper, VenueOneAuthorMapper, VenueTopPubMapper, VenueTopTenAuthorsMapper} +import com.samujjwaal.hw2.mappers._ import com.samujjwaal.hw2.reducers._ import com.samujjwaal.hw2.util.XmlInputFormatWithMultipleTags import com.typesafe.config.{Config, ConfigFactory} @@ -87,13 +87,15 @@ object RunJobs { } if (args(0) == "4") { + // for selecting reducer cleanup operation + configuration.set("jobNo",args(0)) val authorCount1 = Job.getInstance(configuration, "List of top 100 authors who publish with most co-authors(in desc. order)") authorCount1.setJarByClass(this.getClass) //Setting mapper authorCount1.setMapperClass(classOf[CoAuthorCountMapper]) authorCount1.setInputFormatClass(classOf[XmlInputFormatWithMultipleTags]) //setting reducer - authorCount1.setReducerClass(classOf[MostCoAuthorCountReducer]) + authorCount1.setReducerClass(classOf[CoAuthorCountReducer]) authorCount1.setMapOutputKeyClass(classOf[Text]) authorCount1.setMapOutputValueClass(classOf[IntWritable]) authorCount1.setOutputKeyClass(classOf[Text]) @@ -105,13 +107,15 @@ object RunJobs { } if (args(0) == "5") { + // for selecting reducer cleanup operation + configuration.set("jobNo",args(0)) val authorCount2 = Job.getInstance(configuration, "List of 100 authors who publish without co-authors") authorCount2.setJarByClass(this.getClass) //Setting mapper authorCount2.setMapperClass(classOf[CoAuthorCountMapper]) authorCount2.setInputFormatClass(classOf[XmlInputFormatWithMultipleTags]) //setting reducer - authorCount2.setReducerClass(classOf[ZeroCoAuthorCountReducer]) + authorCount2.setReducerClass(classOf[CoAuthorCountReducer]) authorCount2.setMapOutputKeyClass(classOf[Text]) authorCount2.setMapOutputValueClass(classOf[IntWritable]) authorCount2.setOutputKeyClass(classOf[Text]) diff --git a/src/main/scala/com/samujjwaal/hw2/reducers/MostCoAuthorCountReducer.scala b/src/main/scala/com/samujjwaal/hw2/reducers/CoAuthorCountReducer.scala similarity index 50% rename from src/main/scala/com/samujjwaal/hw2/reducers/MostCoAuthorCountReducer.scala rename to src/main/scala/com/samujjwaal/hw2/reducers/CoAuthorCountReducer.scala index eafa27a..5bdf93f 100644 --- a/src/main/scala/com/samujjwaal/hw2/reducers/MostCoAuthorCountReducer.scala +++ b/src/main/scala/com/samujjwaal/hw2/reducers/CoAuthorCountReducer.scala @@ -1,7 +1,6 @@ package com.samujjwaal.hw2.reducers import java.lang - import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.Reducer import org.slf4j.{Logger, LoggerFactory} @@ -10,9 +9,9 @@ import scala.collection.mutable import scala.jdk.CollectionConverters.IterableHasAsScala /** - * Reducer class to calculate the maximum number of co-authors for each author and return top 100 authors + * Reducer class to calculate the number of co-authors for each author */ -class MostCoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] { +class CoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] { val logger: Logger = LoggerFactory.getLogger(this.getClass) // hashmap to store author name and max coauthor count to sort at end of reduce task @@ -35,14 +34,35 @@ class MostCoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] { */ override def cleanup(context: Reducer[Text, IntWritable, Text, Text]#Context): Unit = { - // sort hashmap in descending order by coauthor count of each author and select top 100 - val sortedMap = mutable.LinkedHashMap(map.toSeq.sortWith(_._2 > _._2): _*).take(100) + val outputFlag = context.getConfiguration.get("jobNo") - logger.info("Authors: {}", sortedMap.keys) - sortedMap.foreach(record => { - context.write(new Text(record._1), new Text(record._2.toString)) - }) - // reducer outputs key: & value: - } + // find top 100 authors who have published with the maximum number of co-authors + if (outputFlag == "4"){ + + // sort hashmap in descending order by coauthor count of each author and select top 100 + val sortedMap = mutable.LinkedHashMap(map.toSeq.sortWith(_._2 > _._2): _*).take(100) + + logger.info("Authors: {}", sortedMap.keys) + sortedMap.foreach(record => { + context.write(new Text(record._1), new Text(record._2.toString)) + }) + + // reducer outputs key: & value: + } + // find 100 authors who have published with 0 co-authors + if (outputFlag == "5"){ + + // filter hashmap by coauthor count equals 0 and select 100 + val outputMap = map.filter(_._2 == 0).take(100) + + logger.info("Authors: {}", outputMap.keys) + outputMap.foreach(record => { + context.write(new Text(record._1), new Text(record._2.toString)) + }) + + // reducer outputs key: & value:<0> + } + + } } diff --git a/src/main/scala/com/samujjwaal/hw2/reducers/ZeroCoAuthorCountReducer.scala b/src/main/scala/com/samujjwaal/hw2/reducers/ZeroCoAuthorCountReducer.scala deleted file mode 100644 index 75b4a92..0000000 --- a/src/main/scala/com/samujjwaal/hw2/reducers/ZeroCoAuthorCountReducer.scala +++ /dev/null @@ -1,49 +0,0 @@ -package com.samujjwaal.hw2.reducers - -import java.lang - -import org.apache.hadoop.io.{IntWritable, Text} -import org.apache.hadoop.mapreduce.Reducer -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -/** - * Reducer class to find authors who have published with 0 co-authors - */ -class ZeroCoAuthorCountReducer extends Reducer[Text, IntWritable, Text, Text] { - - val logger: Logger = LoggerFactory.getLogger(this.getClass) - // hashmap to store author name and max coauthor count to sort at end of reduce task - var map: mutable.Map[String, Integer] = mutable.LinkedHashMap[String, Integer]() - - override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, Text]#Context): Unit = { - var list = mutable.ArrayBuffer[Int]() - - // populate list of co-author count of an author(key) for all his publications - values.asScala.foreach(record => { - list += record.get() - }) - - map.put(key.toString, list.max) - logger.info("Maximum no of co-authors for author {}: {}", key.toString, list.max) - } - - /** - * This method is called at the end of all reduce tasks of the job to sort all authors - */ - override def cleanup(context: Reducer[Text, IntWritable, Text, Text]#Context): Unit = { - - // filter hashmap by coauthor count equals 0 and select 100 - val outputMap = map.filter(_._2 == 0).take(100) - - logger.info("Authors: {}", outputMap.keys) - outputMap.foreach(record => { - context.write(new Text(record._1), new Text(record._2.toString)) - }) - - // reducer outputs key: & value:<0> - } - -}