python - Results of reduce and count differ in pyspark -


for spark trials, have downloaded ny taxi csv files , merged them single file, nytaxi.csv . saved in hadoop fs. using spark on yarn 7 nodemanagers.

i connecting spark on ipython notebook.

here sample python script counting number of lines in nytaxi.csv.

nytaxi=sc.textfile("hdfs://bigdata6:8020/user/baris/nytaxi/nytaxi.csv") filtered=nytaxi.filter(lambda x:"distance" not in x) splits = filtered.map(lambda x: float(x.split(",")[9])) splits.cache() splits.count() 

this returns 73491693. when try count lines following code, returns value around 803000.

def plusone (sum, v):     #print sum, v     return sum + 1; splits.reduce(plusone) 

i wonder why results vary. thanks

a sample line csv: u'740bd5be61840be4fe3905cc3ebe3e7e,e48b185060fb0ff49be6da43e69e624b,cmt,1,n,2013-10-01 12:44:29,2013-10-01 12:53:26,1,536,1.20,-73.974319,40.741859,-73.99115,40.742424'

the problem daniel specified in operation used in reduce must associative , commutative. here's reason source itself:

val reducepartition: iterator[t] => option[t] = iter => {   if (iter.hasnext) {     some(iter.reduceleft(cleanf))   } else {     none   } } 

notice reduce done on each partition simple delegation it's iterator's reduceleft. not cause problems accumulation of values.

val mergeresult = (index: int, taskresult: option[t]) => {   if (taskresult.isdefined) {     jobresult = jobresult match {       case some(value) => some(f(value, taskresult.get))       case none => taskresult     }   } } 

but, merging of partitions problem. here's how break down in example (assume 40 count on 4 evenly split partitions):

a = 10; b = 10; c = 10; d = 10 //local reductions. ok added in = 10 //still ok b added in = f(10, 10) = 11 //because definition of f (first + 1)                             //this drops second param of 10 c added in = f(11, 10) = 12 //again, adding 1 instead of actual 10 count 

so, should prefer count, or daniel suggest , map, or have third option aggregate

 rdd.aggregate(0)(_+1, _+_) 

this seed count 0, keep adding 1 accumulator locally, add 2 accumulators in merge.


Comments

Popular posts from this blog

javascript - gulp-nodemon - nodejs restart after file change - Error: listen EADDRINUSE events.js:85 -

Fatal Python error: Py_Initialize: unable to load the file system codec. ImportError: No module named 'encodings' -

oracle - Changing start date for system jobs related to automatic statistics collections in 11g -