I had been trying to use Distributed-Cache in Pig.
After a lot of trial and errors behold SUCCESS!
Lets get to the meat.
Lets go through the steps.
a)Create an Eval UDF
b)Initialize Distributed Cache using getCachedFiles()
c)Initialize the Data Structure using step b.
d)Finally apply your logic on the data.
After a lot of trial and errors behold SUCCESS!
Lets get to the meat.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package UDF; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map.Entry; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.pig.EvalFunc; | |
import org.apache.pig.backend.executionengine.ExecException; | |
import org.apache.pig.data.Tuple; | |
import org.apache.pig.impl.util.WrappedIOException; | |
public class Regex extends EvalFunc<String> { | |
static HashMap<String, String> map = new HashMap<String, String>(); | |
public List<String> getCacheFiles() { | |
Path lookup_file = new Path( | |
"hdfs://localhost.localdomain:8020/user/cloudera/top"); | |
List<String> list = new ArrayList<String>(1); | |
list.add(lookup_file + "#id_lookup"); | |
return list; | |
} | |
public void VectorizeData() throws IOException { | |
FileReader fr = new FileReader("./id_lookup"); | |
BufferedReader brd = new BufferedReader(fr); | |
String line; | |
while ((line = brd.readLine()) != null) { | |
String str[] = line.split("#"); | |
map.put(str[0], str[1]); | |
} | |
fr.close(); | |
} | |
private String Regex(String tweet) throws ExecException { | |
// TODO Auto-generated method stub | |
for (Entry<String, String> entry : map.entrySet()) { | |
Pattern r = Pattern.compile(map.get(entry.getKey())); | |
Matcher m = r.matcher(tweet); | |
if (m.find() == true) { | |
return entry.getValue(); | |
} | |
} | |
return null; | |
} | |
public String exec(Tuple input) throws IOException { | |
if (input == null || input.size() < 1 || input.get(0) == null) | |
return null; | |
try { | |
VectorizeData(); | |
String str = (String) input.get(0); | |
return Regex(str); | |
} catch (Exception e) { | |
throw WrappedIOException.wrap( | |
"Caught exception processing input row ", e); | |
} | |
} | |
} |
a)Create an Eval UDF
b)Initialize Distributed Cache using getCachedFiles()
c)Initialize the Data Structure using step b.
d)Finally apply your logic on the data.
This program is giving me this error ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2998: Unhandled internal error.
ReplyDelete