1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  import bisect 
  34  from random import Random 
  35  from math import sqrt, log, isinf, isnan 
  36   
  37  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  38      BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ 
  39      PickleSerializer, pack_long, CompressedSerializer 
  40  from pyspark.join import python_join, python_left_outer_join, \ 
  41      python_right_outer_join, python_cogroup 
  42  from pyspark.statcounter import StatCounter 
  43  from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler 
  44  from pyspark.storagelevel import StorageLevel 
  45  from pyspark.resultiterable import ResultIterable 
  46  from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ 
  47      get_used_memory 
  48   
  49  from py4j.java_collections import ListConverter, MapConverter 
  50   
  51  __all__ = ["RDD"] 
  57      """ 
  58      This function returns consistent hash code for builtin types, especially 
  59      for None and tuple with None. 
  60   
  61      The algrithm is similar to that one used by CPython 2.7 
  62   
  63      >>> portable_hash(None) 
  64      0 
  65      >>> portable_hash((None, 1)) 
  66      219750521 
  67      """ 
  68      if x is None: 
  69          return 0 
  70      if isinstance(x, tuple): 
  71          h = 0x345678 
  72          for i in x: 
  73              h ^= portable_hash(i) 
  74              h *= 1000003 
  75              h &= 0xffffffff 
  76          h ^= len(x) 
  77          if h == -1: 
  78              h = -2 
  79          return h 
  80      return hash(x) 
   81   
  84      """ 
  85      This function returns the traceback info for a callsite, returns a dict 
  86      with function name, file name and line number 
  87      """ 
  88      tb = traceback.extract_stack() 
  89      callsite = namedtuple("Callsite", "function file linenum") 
  90      if len(tb) == 0: 
  91          return None 
  92      file, line, module, what = tb[len(tb) - 1] 
  93      sparkpath = os.path.dirname(file) 
  94      first_spark_frame = len(tb) - 1 
  95      for i in range(0, len(tb)): 
  96          file, line, fun, what = tb[i] 
  97          if file.startswith(sparkpath): 
  98              first_spark_frame = i 
  99              break 
 100      if first_spark_frame == 0: 
 101          file, line, fun, what = tb[0] 
 102          return callsite(function=fun, file=file, linenum=line) 
 103      sfile, sline, sfun, swhat = tb[first_spark_frame] 
 104      ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] 
 105      return callsite(function=sfun, file=ufile, linenum=uline) 
  106   
 107  _spark_stack_depth = 0 
 111   
 113          tb = _extract_concise_traceback() 
 114          if tb is not None: 
 115              self._traceback = "%s at %s:%s" % ( 
 116                  tb.function, tb.file, tb.linenum) 
 117          else: 
 118              self._traceback = "Error! Could not extract traceback info" 
 119          self._context = sc 
  120   
 126   
  132   
 135   
 136      """ 
 137      An implementation of MaxHeap. 
 138   
 139      >>> import pyspark.rdd 
 140      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 141      >>> [heap.insert(i) for i in range(10)] 
 142      [None, None, None, None, None, None, None, None, None, None] 
 143      >>> sorted(heap.getElements()) 
 144      [0, 1, 2, 3, 4] 
 145      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 146      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 147      [None, None, None, None, None, None, None, None, None, None] 
 148      >>> sorted(heap.getElements()) 
 149      [0, 1, 2, 3, 4] 
 150      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 151      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 152      [None, None, None, None, None, None, None, None, None, None] 
 153      >>> heap.getElements() 
 154      [0] 
 155      """ 
 156   
 158           
 159          self.q = [0] 
 160          self.maxsize = maxsize 
  161   
 163          while (k > 1) and (self.q[k / 2] < self.q[k]): 
 164              self._swap(k, k / 2) 
 165              k = k / 2 
  166   
 168          t = self.q[i] 
 169          self.q[i] = self.q[j] 
 170          self.q[j] = t 
  171   
 173          N = self.size() 
 174          while 2 * k <= N: 
 175              j = 2 * k 
 176               
 177               
 178              if j < N and self.q[j] < self.q[j + 1]: 
 179                  j = j + 1 
 180              if(self.q[k] > self.q[j]): 
 181                  break 
 182              self._swap(k, j) 
 183              k = j 
  184   
 186          return len(self.q) - 1 
  187   
 189          if (self.size()) < self.maxsize: 
 190              self.q.append(value) 
 191              self._swim(self.size()) 
 192          else: 
 193              self._replaceRoot(value) 
  194   
 197   
 199          if(self.q[1] > value): 
 200              self.q[1] = value 
 201              self._sink(1) 
   202   
 205      """ 
 206      Parse a memory string in the format supported by Java (e.g. 1g, 200m) and 
 207      return the value in MB 
 208   
 209      >>> _parse_memory("256m") 
 210      256 
 211      >>> _parse_memory("2g") 
 212      2048 
 213      """ 
 214      units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} 
 215      if s[-1] not in units: 
 216          raise ValueError("invalid format: " + s) 
 217      return int(float(s[:-1]) * units[s[-1].lower()]) 
  218   
 219   
 220 -class RDD(object): 
  221   
 222      """ 
 223      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 224      Represents an immutable, partitioned collection of elements that can be 
 225      operated on in parallel. 
 226      """ 
 227   
 228 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  229          self._jrdd = jrdd 
 230          self.is_cached = False 
 231          self.is_checkpointed = False 
 232          self.ctx = ctx 
 233          self._jrdd_deserializer = jrdd_deserializer 
 234          self._id = jrdd.id() 
  235   
 242   
 244          """ 
 245          A unique ID for this RDD (within its SparkContext). 
 246          """ 
 247          return self._id 
  248   
 250          return self._jrdd.toString() 
  251   
 252      @property 
 254          """ 
 255          The L{SparkContext} that this RDD was created on. 
 256          """ 
 257          return self.ctx 
  258   
 260          """ 
 261          Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). 
 262          """ 
 263          self.is_cached = True 
 264          self.persist(StorageLevel.MEMORY_ONLY_SER) 
 265          return self 
  266   
 268          """ 
 269          Set this RDD's storage level to persist its values across operations 
 270          after the first time it is computed. This can only be used to assign 
 271          a new storage level if the RDD does not have a storage level set yet. 
 272          """ 
 273          self.is_cached = True 
 274          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 275          self._jrdd.persist(javaStorageLevel) 
 276          return self 
  277   
 279          """ 
 280          Mark the RDD as non-persistent, and remove all blocks for it from 
 281          memory and disk. 
 282          """ 
 283          self.is_cached = False 
 284          self._jrdd.unpersist() 
 285          return self 
  286   
 288          """ 
 289          Mark this RDD for checkpointing. It will be saved to a file inside the 
 290          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 291          all references to its parent RDDs will be removed. This function must 
 292          be called before any job has been executed on this RDD. It is strongly 
 293          recommended that this RDD is persisted in memory, otherwise saving it 
 294          on a file will require recomputation. 
 295          """ 
 296          self.is_checkpointed = True 
 297          self._jrdd.rdd().checkpoint() 
  298   
 300          """ 
 301          Return whether this RDD has been checkpointed or not 
 302          """ 
 303          return self._jrdd.rdd().isCheckpointed() 
  304   
 306          """ 
 307          Gets the name of the file to which this RDD was checkpointed 
 308          """ 
 309          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 310          if checkpointFile.isDefined(): 
 311              return checkpointFile.get() 
 312          else: 
 313              return None 
  314   
 315 -    def map(self, f, preservesPartitioning=False): 
  316          """ 
 317          Return a new RDD by applying a function to each element of this RDD. 
 318   
 319          >>> rdd = sc.parallelize(["b", "a", "c"]) 
 320          >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 
 321          [('a', 1), ('b', 1), ('c', 1)] 
 322          """ 
 323          def func(_, iterator): 
 324              return imap(f, iterator) 
  325          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  326   
 327 -    def flatMap(self, f, preservesPartitioning=False): 
  328          """ 
 329          Return a new RDD by first applying a function to all elements of this 
 330          RDD, and then flattening the results. 
 331   
 332          >>> rdd = sc.parallelize([2, 3, 4]) 
 333          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 334          [1, 1, 1, 2, 2, 3] 
 335          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 336          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 337          """ 
 338          def func(s, iterator): 
 339              return chain.from_iterable(imap(f, iterator)) 
  340          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
 341   
 343          """ 
 344          Return a new RDD by applying a function to each partition of this RDD. 
 345   
 346          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 347          >>> def f(iterator): yield sum(iterator) 
 348          >>> rdd.mapPartitions(f).collect() 
 349          [3, 7] 
 350          """ 
 351          def func(s, iterator): 
 352              return f(iterator) 
  353          return self.mapPartitionsWithIndex(func) 
 354   
 356          """ 
 357          Return a new RDD by applying a function to each partition of this RDD, 
 358          while tracking the index of the original partition. 
 359   
 360          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 361          >>> def f(splitIndex, iterator): yield splitIndex 
 362          >>> rdd.mapPartitionsWithIndex(f).sum() 
 363          6 
 364          """ 
 365          return PipelinedRDD(self, f, preservesPartitioning) 
  366   
 368          """ 
 369          Deprecated: use mapPartitionsWithIndex instead. 
 370   
 371          Return a new RDD by applying a function to each partition of this RDD, 
 372          while tracking the index of the original partition. 
 373   
 374          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 375          >>> def f(splitIndex, iterator): yield splitIndex 
 376          >>> rdd.mapPartitionsWithSplit(f).sum() 
 377          6 
 378          """ 
 379          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 380                        "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 381          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  382   
 384          """ 
 385          Returns the number of partitions in RDD 
 386   
 387          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 388          >>> rdd.getNumPartitions() 
 389          2 
 390          """ 
 391          return self._jrdd.partitions().size() 
  392   
 394          """ 
 395          Return a new RDD containing only the elements that satisfy a predicate. 
 396   
 397          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 398          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 399          [2, 4] 
 400          """ 
 401          def func(iterator): 
 402              return ifilter(f, iterator) 
  403          return self.mapPartitions(func) 
 404   
 406          """ 
 407          Return a new RDD containing the distinct elements in this RDD. 
 408   
 409          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 410          [1, 2, 3] 
 411          """ 
 412          return self.map(lambda x: (x, None)) \ 
 413                     .reduceByKey(lambda x, _: x) \ 
 414                     .map(lambda (x, _): x) 
  415   
 416 -    def sample(self, withReplacement, fraction, seed=None): 
  417          """ 
 418          Return a sampled subset of this RDD (relies on numpy and falls back 
 419          on default random generator if numpy is unavailable). 
 420          """ 
 421          assert fraction >= 0.0, "Negative fraction value: %s" % fraction 
 422          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  423   
 424       
 425 -    def takeSample(self, withReplacement, num, seed=None): 
  426          """ 
 427          Return a fixed-size sampled subset of this RDD (currently requires 
 428          numpy). 
 429   
 430          >>> rdd = sc.parallelize(range(0, 10)) 
 431          >>> len(rdd.takeSample(True, 20, 1)) 
 432          20 
 433          >>> len(rdd.takeSample(False, 5, 2)) 
 434          5 
 435          >>> len(rdd.takeSample(False, 15, 3)) 
 436          10 
 437          """ 
 438          numStDev = 10.0 
 439   
 440          if num < 0: 
 441              raise ValueError("Sample size cannot be negative.") 
 442          elif num == 0: 
 443              return [] 
 444   
 445          initialCount = self.count() 
 446          if initialCount == 0: 
 447              return [] 
 448   
 449          rand = Random(seed) 
 450   
 451          if (not withReplacement) and num >= initialCount: 
 452               
 453              samples = self.collect() 
 454              rand.shuffle(samples) 
 455              return samples 
 456   
 457          maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) 
 458          if num > maxSampleSize: 
 459              raise ValueError( 
 460                  "Sample size cannot be greater than %d." % maxSampleSize) 
 461   
 462          fraction = RDD._computeFractionForSampleSize( 
 463              num, initialCount, withReplacement) 
 464          samples = self.sample(withReplacement, fraction, seed).collect() 
 465   
 466           
 467           
 468           
 469          while len(samples) < num: 
 470               
 471              seed = rand.randint(0, sys.maxint) 
 472              samples = self.sample(withReplacement, fraction, seed).collect() 
 473   
 474          rand.shuffle(samples) 
 475   
 476          return samples[0:num] 
  477   
 478      @staticmethod 
 480          """ 
 481          Returns a sampling rate that guarantees a sample of 
 482          size >= sampleSizeLowerBound 99.99% of the time. 
 483   
 484          How the sampling rate is determined: 
 485          Let p = num / total, where num is the sample size and total is the 
 486          total number of data points in the RDD. We're trying to compute 
 487          q > p such that 
 488            - when sampling with replacement, we're drawing each data point 
 489              with prob_i ~ Pois(q), where we want to guarantee 
 490              Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to 
 491              total), i.e. the failure rate of not having a sufficiently large 
 492              sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient 
 493              to guarantee 0.9999 success rate for num > 12, but we need a 
 494              slightly larger q (9 empirically determined). 
 495            - when sampling without replacement, we're drawing each data point 
 496              with prob_i ~ Binomial(total, fraction) and our choice of q 
 497              guarantees 1-delta, or 0.9999 success rate, where success rate is 
 498              defined the same as in sampling with replacement. 
 499          """ 
 500          fraction = float(sampleSizeLowerBound) / total 
 501          if withReplacement: 
 502              numStDev = 5 
 503              if (sampleSizeLowerBound < 12): 
 504                  numStDev = 9 
 505              return fraction + numStDev * sqrt(fraction / total) 
 506          else: 
 507              delta = 0.00005 
 508              gamma = - log(delta) / total 
 509              return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction)) 
  510   
 512          """ 
 513          Return the union of this RDD and another one. 
 514   
 515          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 516          >>> rdd.union(rdd).collect() 
 517          [1, 1, 2, 3, 1, 1, 2, 3] 
 518          """ 
 519          if self._jrdd_deserializer == other._jrdd_deserializer: 
 520              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 521                        self._jrdd_deserializer) 
 522              return rdd 
 523          else: 
 524               
 525               
 526              self_copy = self._reserialize() 
 527              other_copy = other._reserialize() 
 528              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 529                         self.ctx.serializer) 
  530   
 532          """ 
 533          Return the intersection of this RDD and another one. The output will 
 534          not contain any duplicate elements, even if the input RDDs did. 
 535   
 536          Note that this method performs a shuffle internally. 
 537   
 538          >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 
 539          >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 
 540          >>> rdd1.intersection(rdd2).collect() 
 541          [1, 2, 3] 
 542          """ 
 543          return self.map(lambda v: (v, None)) \ 
 544              .cogroup(other.map(lambda v: (v, None))) \ 
 545              .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 
 546              .keys() 
  547   
 549          serializer = serializer or self.ctx.serializer 
 550          if self._jrdd_deserializer == serializer: 
 551              return self 
 552          else: 
 553              converted = self.map(lambda x: x, preservesPartitioning=True) 
 554              converted._jrdd_deserializer = serializer 
 555              return converted 
  556   
 558          """ 
 559          Return the union of this RDD and another one. 
 560   
 561          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 562          >>> (rdd + rdd).collect() 
 563          [1, 1, 2, 3, 1, 1, 2, 3] 
 564          """ 
 565          if not isinstance(other, RDD): 
 566              raise TypeError 
 567          return self.union(other) 
  568   
 569 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): 
  570          """ 
 571          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 572          # noqa 
 573   
 574          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 575          >>> sc.parallelize(tmp).sortByKey().first() 
 576          ('1', 3) 
 577          >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 
 578          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 579          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 580          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 581          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 582          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 583          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 584          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] 
 585          """ 
 586          if numPartitions is None: 
 587              numPartitions = self._defaultReducePartitions() 
 588   
 589          def sortPartition(iterator): 
 590              return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending)) 
  591   
 592          if numPartitions == 1: 
 593              if self.getNumPartitions() > 1: 
 594                  self = self.coalesce(1) 
 595              return self.mapPartitions(sortPartition) 
 596   
 597           
 598           
 599           
 600          rddSize = self.count() 
 601          if not rddSize: 
 602              return self   
 603          maxSampleSize = numPartitions * 20.0   
 604          fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 605          samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 606          samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 607   
 608           
 609           
 610          bounds = [samples[len(samples) * (i + 1) / numPartitions] 
 611                    for i in range(0, numPartitions - 1)] 
 612   
 613          def rangePartitioner(k): 
 614              p = bisect.bisect_left(bounds, keyfunc(k)) 
 615              if ascending: 
 616                  return p 
 617              else: 
 618                  return numPartitions - 1 - p 
 619   
 620          return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 
 621   
 622 -    def sortBy(self, keyfunc, ascending=True, numPartitions=None): 
  623          """ 
 624          Sorts this RDD by the given keyfunc 
 625   
 626          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 627          >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() 
 628          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 629          >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() 
 630          [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 631          """ 
 632          return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() 
  633   
 635          """ 
 636          Return an RDD created by coalescing all elements within each partition 
 637          into a list. 
 638   
 639          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 640          >>> sorted(rdd.glom().collect()) 
 641          [[1, 2], [3, 4]] 
 642          """ 
 643          def func(iterator): 
 644              yield list(iterator) 
  645          return self.mapPartitions(func) 
 646   
 648          """ 
 649          Return the Cartesian product of this RDD and another one, that is, the 
 650          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 651          C{b} is in C{other}. 
 652   
 653          >>> rdd = sc.parallelize([1, 2]) 
 654          >>> sorted(rdd.cartesian(rdd).collect()) 
 655          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 656          """ 
 657           
 658          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 659                                               other._jrdd_deserializer) 
 660          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  661   
 662 -    def groupBy(self, f, numPartitions=None): 
  663          """ 
 664          Return an RDD of grouped items. 
 665   
 666          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 667          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 668          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 669          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 670          """ 
 671          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  672   
 673 -    def pipe(self, command, env={}): 
  674          """ 
 675          Return an RDD created by piping elements to a forked external process. 
 676   
 677          >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 
 678          ['1', '2', '', '3'] 
 679          """ 
 680          def func(iterator): 
 681              pipe = Popen( 
 682                  shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 683   
 684              def pipe_objs(out): 
 685                  for obj in iterator: 
 686                      out.write(str(obj).rstrip('\n') + '\n') 
 687                  out.close() 
  688              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 689              return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 
 690          return self.mapPartitions(func) 
 691   
 693          """ 
 694          Applies a function to all elements of this RDD. 
 695   
 696          >>> def f(x): print x 
 697          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 698          """ 
 699          def processPartition(iterator): 
 700              for x in iterator: 
 701                  f(x) 
 702              yield None 
  703          self.mapPartitions(processPartition).collect()   
 704   
 706          """ 
 707          Applies a function to each partition of this RDD. 
 708   
 709          >>> def f(iterator): 
 710          ...      for x in iterator: 
 711          ...           print x 
 712          ...      yield None 
 713          >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 
 714          """ 
 715          self.mapPartitions(f).collect()   
  716   
 718          """ 
 719          Return a list that contains all of the elements in this RDD. 
 720          """ 
 721          with _JavaStackTrace(self.context) as st: 
 722              bytesInJava = self._jrdd.collect().iterator() 
 723          return list(self._collect_iterator_through_file(bytesInJava)) 
  724   
 726           
 727           
 728           
 729          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 730          tempFile.close() 
 731          self.ctx._writeToFile(iterator, tempFile.name) 
 732           
 733          with open(tempFile.name, 'rb') as tempFile: 
 734              for item in self._jrdd_deserializer.load_stream(tempFile): 
 735                  yield item 
 736          os.unlink(tempFile.name) 
  737   
 739          """ 
 740          Reduces the elements of this RDD using the specified commutative and 
 741          associative binary operator. Currently reduces partitions locally. 
 742   
 743          >>> from operator import add 
 744          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 745          15 
 746          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 747          10 
 748          """ 
 749          def func(iterator): 
 750              acc = None 
 751              for obj in iterator: 
 752                  if acc is None: 
 753                      acc = obj 
 754                  else: 
 755                      acc = f(obj, acc) 
 756              if acc is not None: 
 757                  yield acc 
  758          vals = self.mapPartitions(func).collect() 
 759          return reduce(f, vals) 
 760   
 761 -    def fold(self, zeroValue, op): 
  762          """ 
 763          Aggregate the elements of each partition, and then the results for all 
 764          the partitions, using a given associative function and a neutral "zero 
 765          value." 
 766   
 767          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 768          as its result value to avoid object allocation; however, it should not 
 769          modify C{t2}. 
 770   
 771          >>> from operator import add 
 772          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 773          15 
 774          """ 
 775          def func(iterator): 
 776              acc = zeroValue 
 777              for obj in iterator: 
 778                  acc = op(obj, acc) 
 779              yield acc 
  780          vals = self.mapPartitions(func).collect() 
 781          return reduce(op, vals, zeroValue) 
 782   
 783 -    def aggregate(self, zeroValue, seqOp, combOp): 
  784          """ 
 785          Aggregate the elements of each partition, and then the results for all 
 786          the partitions, using a given combine functions and a neutral "zero 
 787          value." 
 788   
 789          The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 
 790          as its result value to avoid object allocation; however, it should not 
 791          modify C{t2}. 
 792   
 793          The first function (seqOp) can return a different result type, U, than 
 794          the type of this RDD. Thus, we need one operation for merging a T into 
 795          an U and one operation for merging two U 
 796   
 797          >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 
 798          >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 799          >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 
 800          (10, 4) 
 801          >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 
 802          (0, 0) 
 803          """ 
 804          def func(iterator): 
 805              acc = zeroValue 
 806              for obj in iterator: 
 807                  acc = seqOp(acc, obj) 
 808              yield acc 
  809   
 810          return self.mapPartitions(func).fold(zeroValue, combOp) 
 811   
 813          """ 
 814          Find the maximum item in this RDD. 
 815   
 816          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 
 817          43.0 
 818          """ 
 819          return self.reduce(max) 
  820   
 822          """ 
 823          Find the minimum item in this RDD. 
 824   
 825          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 
 826          1.0 
 827          """ 
 828          return self.reduce(min) 
  829   
 831          """ 
 832          Add up the elements in this RDD. 
 833   
 834          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 835          6.0 
 836          """ 
 837          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  838   
 840          """ 
 841          Return the number of elements in this RDD. 
 842   
 843          >>> sc.parallelize([2, 3, 4]).count() 
 844          3 
 845          """ 
 846          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  847   
 849          """ 
 850          Return a L{StatCounter} object that captures the mean, variance 
 851          and count of the RDD's elements in one operation. 
 852          """ 
 853          def redFunc(left_counter, right_counter): 
 854              return left_counter.mergeStats(right_counter) 
  855   
 856          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 857   
 859          """ 
 860          Compute a histogram using the provided buckets. The buckets 
 861          are all open to the right except for the last which is closed. 
 862          e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], 
 863          which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 
 864          and 50 we would have a histogram of 1,0,1. 
 865   
 866          If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), 
 867          this can be switched from an O(log n) inseration to O(1) per 
 868          element(where n = # buckets). 
 869   
 870          Buckets must be sorted and not contain any duplicates, must be 
 871          at least two elements. 
 872   
 873          If `buckets` is a number, it will generates buckets which are 
 874          evenly spaced between the minimum and maximum of the RDD. For 
 875          example, if the min value is 0 and the max is 100, given buckets 
 876          as 2, the resulting buckets will be [0,50) [50,100]. buckets must 
 877          be at least 1 If the RDD contains infinity, NaN throws an exception 
 878          If the elements in RDD do not vary (max == min) always returns 
 879          a single bucket. 
 880   
 881          It will return an tuple of buckets and histogram. 
 882   
 883          >>> rdd = sc.parallelize(range(51)) 
 884          >>> rdd.histogram(2) 
 885          ([0, 25, 50], [25, 26]) 
 886          >>> rdd.histogram([0, 5, 25, 50]) 
 887          ([0, 5, 25, 50], [5, 20, 26]) 
 888          >>> rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets 
 889          ([0, 15, 30, 45, 60], [15, 15, 15, 6]) 
 890          >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) 
 891          >>> rdd.histogram(("a", "b", "c")) 
 892          (('a', 'b', 'c'), [2, 2]) 
 893          """ 
 894   
 895          if isinstance(buckets, (int, long)): 
 896              if buckets < 1: 
 897                  raise ValueError("number of buckets must be >= 1") 
 898   
 899               
 900              def comparable(x): 
 901                  if x is None: 
 902                      return False 
 903                  if type(x) is float and isnan(x): 
 904                      return False 
 905                  return True 
  906   
 907              filtered = self.filter(comparable) 
 908   
 909               
 910              def minmax(a, b): 
 911                  return min(a[0], b[0]), max(a[1], b[1]) 
 912              try: 
 913                  minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) 
 914              except TypeError as e: 
 915                  if " empty " in str(e): 
 916                      raise ValueError("can not generate buckets from empty RDD") 
 917                  raise 
 918   
 919              if minv == maxv or buckets == 1: 
 920                  return [minv, maxv], [filtered.count()] 
 921   
 922              try: 
 923                  inc = (maxv - minv) / buckets 
 924              except TypeError: 
 925                  raise TypeError("Can not generate buckets with non-number in RDD") 
 926   
 927              if isinf(inc): 
 928                  raise ValueError("Can not generate buckets with infinite value") 
 929   
 930               
 931              if inc * buckets != maxv - minv: 
 932                  inc = (maxv - minv) * 1.0 / buckets 
 933   
 934              buckets = [i * inc + minv for i in range(buckets)] 
 935              buckets.append(maxv)   
 936              even = True 
 937   
 938          elif isinstance(buckets, (list, tuple)): 
 939              if len(buckets) < 2: 
 940                  raise ValueError("buckets should have more than one value") 
 941   
 942              if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): 
 943                  raise ValueError("can not have None or NaN in buckets") 
 944   
 945              if sorted(buckets) != list(buckets): 
 946                  raise ValueError("buckets should be sorted") 
 947   
 948              if len(set(buckets)) != len(buckets): 
 949                  raise ValueError("buckets should not contain duplicated values") 
 950   
 951              minv = buckets[0] 
 952              maxv = buckets[-1] 
 953              even = False 
 954              inc = None 
 955              try: 
 956                  steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] 
 957              except TypeError: 
 958                  pass   
 959              else: 
 960                  if max(steps) - min(steps) < 1e-10:   
 961                      even = True 
 962                      inc = (maxv - minv) / (len(buckets) - 1) 
 963   
 964          else: 
 965              raise TypeError("buckets should be a list or tuple or number(int or long)") 
 966   
 967          def histogram(iterator): 
 968              counters = [0] * len(buckets) 
 969              for i in iterator: 
 970                  if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: 
 971                      continue 
 972                  t = (int((i - minv) / inc) if even 
 973                       else bisect.bisect_right(buckets, i) - 1) 
 974                  counters[t] += 1 
 975               
 976              last = counters.pop() 
 977              counters[-1] += last 
 978              return [counters] 
 979   
 980          def mergeCounters(a, b): 
 981              return [i + j for i, j in zip(a, b)] 
 982   
 983          return buckets, self.mapPartitions(histogram).reduce(mergeCounters) 
 984   
 986          """ 
 987          Compute the mean of this RDD's elements. 
 988   
 989          >>> sc.parallelize([1, 2, 3]).mean() 
 990          2.0 
 991          """ 
 992          return self.stats().mean() 
  993   
 995          """ 
 996          Compute the variance of this RDD's elements. 
 997   
 998          >>> sc.parallelize([1, 2, 3]).variance() 
 999          0.666... 
1000          """ 
1001          return self.stats().variance() 
 1002   
1004          """ 
1005          Compute the standard deviation of this RDD's elements. 
1006   
1007          >>> sc.parallelize([1, 2, 3]).stdev() 
1008          0.816... 
1009          """ 
1010          return self.stats().stdev() 
 1011   
1013          """ 
1014          Compute the sample standard deviation of this RDD's elements (which 
1015          corrects for bias in estimating the standard deviation by dividing by 
1016          N-1 instead of N). 
1017   
1018          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
1019          1.0 
1020          """ 
1021          return self.stats().sampleStdev() 
 1022   
1024          """ 
1025          Compute the sample variance of this RDD's elements (which corrects 
1026          for bias in estimating the variance by dividing by N-1 instead of N). 
1027   
1028          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
1029          1.0 
1030          """ 
1031          return self.stats().sampleVariance() 
 1032   
1034          """ 
1035          Return the count of each unique value in this RDD as a dictionary of 
1036          (value, count) pairs. 
1037   
1038          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
1039          [(1, 2), (2, 3)] 
1040          """ 
1041          def countPartition(iterator): 
1042              counts = defaultdict(int) 
1043              for obj in iterator: 
1044                  counts[obj] += 1 
1045              yield counts 
 1046   
1047          def mergeMaps(m1, m2): 
1048              for (k, v) in m2.iteritems(): 
1049                  m1[k] += v 
1050              return m1 
1051          return self.mapPartitions(countPartition).reduce(mergeMaps) 
1052   
1053 -    def top(self, num): 
 1054          """ 
1055          Get the top N elements from a RDD. 
1056   
1057          Note: It returns the list sorted in descending order. 
1058          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
1059          [12] 
1060          >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 
1061          [6, 5] 
1062          """ 
1063          def topIterator(iterator): 
1064              q = [] 
1065              for k in iterator: 
1066                  if len(q) < num: 
1067                      heapq.heappush(q, k) 
1068                  else: 
1069                      heapq.heappushpop(q, k) 
1070              yield q 
 1071   
1072          def merge(a, b): 
1073              return next(topIterator(a + b)) 
1074   
1075          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
1076   
1078          """ 
1079          Get the N elements from a RDD ordered in ascending order or as 
1080          specified by the optional key function. 
1081   
1082          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
1083          [1, 2, 3, 4, 5, 6] 
1084          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
1085          [10, 9, 7, 6, 5, 4] 
1086          """ 
1087   
1088          def topNKeyedElems(iterator, key_=None): 
1089              q = MaxHeapQ(num) 
1090              for k in iterator: 
1091                  if key_ is not None: 
1092                      k = (key_(k), k) 
1093                  q.insert(k) 
1094              yield q.getElements() 
 1095   
1096          def unKey(x, key_=None): 
1097              if key_ is not None: 
1098                  x = [i[1] for i in x] 
1099              return x 
1100   
1101          def merge(a, b): 
1102              return next(topNKeyedElems(a + b)) 
1103          result = self.mapPartitions( 
1104              lambda i: topNKeyedElems(i, key)).reduce(merge) 
1105          return sorted(unKey(result, key), key=key) 
1106   
1107 -    def take(self, num): 
 1108          """ 
1109          Take the first num elements of the RDD. 
1110   
1111          It works by first scanning one partition, and use the results from 
1112          that partition to estimate the number of additional partitions needed 
1113          to satisfy the limit. 
1114   
1115          Translated from the Scala implementation in RDD#take(). 
1116   
1117          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
1118          [2, 3] 
1119          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
1120          [2, 3, 4, 5, 6] 
1121          >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) 
1122          [91, 92, 93] 
1123          """ 
1124          items = [] 
1125          totalParts = self._jrdd.partitions().size() 
1126          partsScanned = 0 
1127   
1128          while len(items) < num and partsScanned < totalParts: 
1129               
1130               
1131               
1132              numPartsToTry = 1 
1133              if partsScanned > 0: 
1134                   
1135                   
1136                   
1137                  if len(items) == 0: 
1138                      numPartsToTry = partsScanned * 4 
1139                  else: 
1140                      numPartsToTry = int(1.5 * num * partsScanned / len(items)) 
1141   
1142              left = num - len(items) 
1143   
1144              def takeUpToNumLeft(iterator): 
1145                  taken = 0 
1146                  while taken < left: 
1147                      yield next(iterator) 
1148                      taken += 1 
 1149   
1150              p = range( 
1151                  partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
1152              res = self.context.runJob(self, takeUpToNumLeft, p, True) 
1153   
1154              items += res 
1155              partsScanned += numPartsToTry 
1156   
1157          return items[:num] 
1158   
1160          """ 
1161          Return the first element in this RDD. 
1162   
1163          >>> sc.parallelize([2, 3, 4]).first() 
1164          2 
1165          """ 
1166          return self.take(1)[0] 
 1167   
1169          """ 
1170          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1171          system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are 
1172          converted for output using either user specified converters or, by default, 
1173          L{org.apache.spark.api.python.JavaToWritableConverter}. 
1174   
1175          @param conf: Hadoop job configuration, passed in as a dict 
1176          @param keyConverter: (None by default) 
1177          @param valueConverter: (None by default) 
1178          """ 
1179          jconf = self.ctx._dictToJavaMap(conf) 
1180          pickledRDD = self._toPickleSerialization() 
1181          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1182          self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 
1183                                                      keyConverter, valueConverter, True) 
 1184   
1185 -    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 
1186                                 keyConverter=None, valueConverter=None, conf=None): 
 1187          """ 
1188          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1189          system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types 
1190          will be inferred if not specified. Keys and values are converted for output using either 
1191          user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 
1192          C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 
1193          of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 
1194   
1195          @param path: path to Hadoop file 
1196          @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 
1197                 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") 
1198          @param keyClass: fully qualified classname of key Writable class 
1199                 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 
1200          @param valueClass: fully qualified classname of value Writable class 
1201                 (e.g. "org.apache.hadoop.io.Text", None by default) 
1202          @param keyConverter: (None by default) 
1203          @param valueConverter: (None by default) 
1204          @param conf: Hadoop job configuration, passed in as a dict (None by default) 
1205          """ 
1206          jconf = self.ctx._dictToJavaMap(conf) 
1207          pickledRDD = self._toPickleSerialization() 
1208          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1209          self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, 
1210                                                         outputFormatClass, 
1211                                                         keyClass, valueClass, 
1212                                                         keyConverter, valueConverter, jconf) 
 1213   
1215          """ 
1216          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1217          system, using the old Hadoop OutputFormat API (mapred package). Keys/values are 
1218          converted for output using either user specified converters or, by default, 
1219          L{org.apache.spark.api.python.JavaToWritableConverter}. 
1220   
1221          @param conf: Hadoop job configuration, passed in as a dict 
1222          @param keyConverter: (None by default) 
1223          @param valueConverter: (None by default) 
1224          """ 
1225          jconf = self.ctx._dictToJavaMap(conf) 
1226          pickledRDD = self._toPickleSerialization() 
1227          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1228          self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 
1229                                                      keyConverter, valueConverter, False) 
 1230   
1231 -    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 
1232                           keyConverter=None, valueConverter=None, conf=None, 
1233                           compressionCodecClass=None): 
 1234          """ 
1235          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1236          system, using the old Hadoop OutputFormat API (mapred package). Key and value types 
1237          will be inferred if not specified. Keys and values are converted for output using either 
1238          user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 
1239          C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 
1240          of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 
1241   
1242          @param path: path to Hadoop file 
1243          @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 
1244                 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") 
1245          @param keyClass: fully qualified classname of key Writable class 
1246                 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 
1247          @param valueClass: fully qualified classname of value Writable class 
1248                 (e.g. "org.apache.hadoop.io.Text", None by default) 
1249          @param keyConverter: (None by default) 
1250          @param valueConverter: (None by default) 
1251          @param conf: (None by default) 
1252          @param compressionCodecClass: (None by default) 
1253          """ 
1254          jconf = self.ctx._dictToJavaMap(conf) 
1255          pickledRDD = self._toPickleSerialization() 
1256          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1257          self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, 
1258                                                   outputFormatClass, 
1259                                                   keyClass, valueClass, 
1260                                                   keyConverter, valueConverter, 
1261                                                   jconf, compressionCodecClass) 
 1262   
1264          """ 
1265          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1266          system, using the L{org.apache.hadoop.io.Writable} types that we convert from the 
1267          RDD's key and value types. The mechanism is as follows: 
1268              1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 
1269              2. Keys and values of this Java RDD are converted to Writables and written out. 
1270   
1271          @param path: path to sequence file 
1272          @param compressionCodecClass: (None by default) 
1273          """ 
1274          pickledRDD = self._toPickleSerialization() 
1275          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1276          self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched, 
1277                                                     path, compressionCodecClass) 
 1278   
1280          """ 
1281          Save this RDD as a SequenceFile of serialized objects. The serializer 
1282          used is L{pyspark.serializers.PickleSerializer}, default batch size 
1283          is 10. 
1284   
1285          >>> tmpFile = NamedTemporaryFile(delete=True) 
1286          >>> tmpFile.close() 
1287          >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) 
1288          >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) 
1289          [1, 2, 'rdd', 'spark'] 
1290          """ 
1291          self._reserialize(BatchedSerializer(PickleSerializer(), 
1292                                              batchSize))._jrdd.saveAsObjectFile(path) 
 1293   
1294 -    def saveAsTextFile(self, path): 
 1295          """ 
1296          Save this RDD as a text file, using string representations of elements. 
1297   
1298          >>> tempFile = NamedTemporaryFile(delete=True) 
1299          >>> tempFile.close() 
1300          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
1301          >>> from fileinput import input 
1302          >>> from glob import glob 
1303          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
1304          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
1305   
1306          Empty lines are tolerated when saving to text files. 
1307   
1308          >>> tempFile2 = NamedTemporaryFile(delete=True) 
1309          >>> tempFile2.close() 
1310          >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 
1311          >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 
1312          '\\n\\n\\nbar\\nfoo\\n' 
1313          """ 
1314          def func(split, iterator): 
1315              for x in iterator: 
1316                  if not isinstance(x, basestring): 
1317                      x = unicode(x) 
1318                  if isinstance(x, unicode): 
1319                      x = x.encode("utf-8") 
1320                  yield x 
 1321          keyed = self.mapPartitionsWithIndex(func) 
1322          keyed._bypass_serializer = True 
1323          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
1324   
1325       
1326   
1328          """ 
1329          Return the key-value pairs in this RDD to the master as a dictionary. 
1330   
1331          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
1332          >>> m[1] 
1333          2 
1334          >>> m[3] 
1335          4 
1336          """ 
1337          return dict(self.collect()) 
 1338   
1340          """ 
1341          Return an RDD with the keys of each tuple. 
1342   
1343          >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 
1344          >>> m.collect() 
1345          [1, 3] 
1346          """ 
1347          return self.map(lambda (k, v): k) 
 1348   
1350          """ 
1351          Return an RDD with the values of each tuple. 
1352   
1353          >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 
1354          >>> m.collect() 
1355          [2, 4] 
1356          """ 
1357          return self.map(lambda (k, v): v) 
 1358   
1360          """ 
1361          Merge the values for each key using an associative reduce function. 
1362   
1363          This will also perform the merging locally on each mapper before 
1364          sending results to a reducer, similarly to a "combiner" in MapReduce. 
1365   
1366          Output will be hash-partitioned with C{numPartitions} partitions, or 
1367          the default parallelism level if C{numPartitions} is not specified. 
1368   
1369          >>> from operator import add 
1370          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1371          >>> sorted(rdd.reduceByKey(add).collect()) 
1372          [('a', 2), ('b', 1)] 
1373          """ 
1374          return self.combineByKey(lambda x: x, func, func, numPartitions) 
 1375   
1377          """ 
1378          Merge the values for each key using an associative reduce function, but 
1379          return the results immediately to the master as a dictionary. 
1380   
1381          This will also perform the merging locally on each mapper before 
1382          sending results to a reducer, similarly to a "combiner" in MapReduce. 
1383   
1384          >>> from operator import add 
1385          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1386          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
1387          [('a', 2), ('b', 1)] 
1388          """ 
1389          def reducePartition(iterator): 
1390              m = {} 
1391              for (k, v) in iterator: 
1392                  m[k] = v if k not in m else func(m[k], v) 
1393              yield m 
 1394   
1395          def mergeMaps(m1, m2): 
1396              for (k, v) in m2.iteritems(): 
1397                  m1[k] = v if k not in m1 else func(m1[k], v) 
1398              return m1 
1399          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
1400   
1402          """ 
1403          Count the number of elements for each key, and return the result to the 
1404          master as a dictionary. 
1405   
1406          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1407          >>> sorted(rdd.countByKey().items()) 
1408          [('a', 2), ('b', 1)] 
1409          """ 
1410          return self.map(lambda x: x[0]).countByValue() 
 1411   
1412 -    def join(self, other, numPartitions=None): 
 1413          """ 
1414          Return an RDD containing all pairs of elements with matching keys in 
1415          C{self} and C{other}. 
1416   
1417          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
1418          (k, v1) is in C{self} and (k, v2) is in C{other}. 
1419   
1420          Performs a hash join across the cluster. 
1421   
1422          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1423          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
1424          >>> sorted(x.join(y).collect()) 
1425          [('a', (1, 2)), ('a', (1, 3))] 
1426          """ 
1427          return python_join(self, other, numPartitions) 
 1428   
1430          """ 
1431          Perform a left outer join of C{self} and C{other}. 
1432   
1433          For each element (k, v) in C{self}, the resulting RDD will either 
1434          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
1435          (k, (v, None)) if no elements in other have key k. 
1436   
1437          Hash-partitions the resulting RDD into the given number of partitions. 
1438   
1439          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1440          >>> y = sc.parallelize([("a", 2)]) 
1441          >>> sorted(x.leftOuterJoin(y).collect()) 
1442          [('a', (1, 2)), ('b', (4, None))] 
1443          """ 
1444          return python_left_outer_join(self, other, numPartitions) 
 1445   
1447          """ 
1448          Perform a right outer join of C{self} and C{other}. 
1449   
1450          For each element (k, w) in C{other}, the resulting RDD will either 
1451          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
1452          if no elements in C{self} have key k. 
1453   
1454          Hash-partitions the resulting RDD into the given number of partitions. 
1455   
1456          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1457          >>> y = sc.parallelize([("a", 2)]) 
1458          >>> sorted(y.rightOuterJoin(x).collect()) 
1459          [('a', (2, 1)), ('b', (None, 4))] 
1460          """ 
1461          return python_right_outer_join(self, other, numPartitions) 
 1462   
1463       
1464       
1465       
1466 -    def partitionBy(self, numPartitions, partitionFunc=portable_hash): 
 1467          """ 
1468          Return a copy of the RDD partitioned using the specified partitioner. 
1469   
1470          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
1471          >>> sets = pairs.partitionBy(2).glom().collect() 
1472          >>> set(sets[0]).intersection(set(sets[1])) 
1473          set([]) 
1474          """ 
1475          if numPartitions is None: 
1476              numPartitions = self._defaultReducePartitions() 
1477   
1478           
1479           
1480           
1481           
1482           
1483           
1484          outputSerializer = self.ctx._unbatched_serializer 
1485   
1486          limit = (_parse_memory(self.ctx._conf.get( 
1487              "spark.python.worker.memory", "512m")) / 2) 
1488   
1489          def add_shuffle_key(split, iterator): 
1490   
1491              buckets = defaultdict(list) 
1492              c, batch = 0, min(10 * numPartitions, 1000) 
1493   
1494              for (k, v) in iterator: 
1495                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
1496                  c += 1 
1497   
1498                   
1499                  if (c % 1000 == 0 and get_used_memory() > limit 
1500                          or c > batch): 
1501                      n, size = len(buckets), 0 
1502                      for split in buckets.keys(): 
1503                          yield pack_long(split) 
1504                          d = outputSerializer.dumps(buckets[split]) 
1505                          del buckets[split] 
1506                          yield d 
1507                          size += len(d) 
1508   
1509                      avg = (size / n) >> 20 
1510                       
1511                      if avg < 1: 
1512                          batch *= 1.5 
1513                      elif avg > 10: 
1514                          batch = max(batch / 1.5, 1) 
1515                      c = 0 
1516   
1517              for (split, items) in buckets.iteritems(): 
1518                  yield pack_long(split) 
1519                  yield outputSerializer.dumps(items) 
 1520   
1521          keyed = self.mapPartitionsWithIndex(add_shuffle_key) 
1522          keyed._bypass_serializer = True 
1523          with _JavaStackTrace(self.context) as st: 
1524              pairRDD = self.ctx._jvm.PairwiseRDD( 
1525                  keyed._jrdd.rdd()).asJavaPairRDD() 
1526              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
1527                                                            id(partitionFunc)) 
1528          jrdd = pairRDD.partitionBy(partitioner).values() 
1529          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
1530           
1531           
1532          rdd._partitionFunc = partitionFunc 
1533          return rdd 
1534   
1535       
1536 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
1537                       numPartitions=None): 
 1538          """ 
1539          Generic function to combine the elements for each key using a custom 
1540          set of aggregation functions. 
1541   
1542          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
1543          type" C.  Note that V and C can be different -- for example, one might 
1544          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
1545   
1546          Users provide three functions: 
1547   
1548              - C{createCombiner}, which turns a V into a C (e.g., creates 
1549                a one-element list) 
1550              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
1551                a list) 
1552              - C{mergeCombiners}, to combine two C's into a single one. 
1553   
1554          In addition, users can control the partitioning of the output RDD. 
1555   
1556          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1557          >>> def f(x): return x 
1558          >>> def add(a, b): return a + str(b) 
1559          >>> sorted(x.combineByKey(str, add, add).collect()) 
1560          [('a', '11'), ('b', '1')] 
1561          """ 
1562          if numPartitions is None: 
1563              numPartitions = self._defaultReducePartitions() 
1564   
1565          serializer = self.ctx.serializer 
1566          spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() 
1567                   == 'true') 
1568          memory = _parse_memory(self.ctx._conf.get( 
1569              "spark.python.worker.memory", "512m")) 
1570          agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 
1571   
1572          def combineLocally(iterator): 
1573              merger = ExternalMerger(agg, memory * 0.9, serializer) \ 
1574                  if spill else InMemoryMerger(agg) 
1575              merger.mergeValues(iterator) 
1576              return merger.iteritems() 
 1577   
1578          locally_combined = self.mapPartitions(combineLocally) 
1579          shuffled = locally_combined.partitionBy(numPartitions) 
1580   
1581          def _mergeCombiners(iterator): 
1582              merger = ExternalMerger(agg, memory, serializer) \ 
1583                  if spill else InMemoryMerger(agg) 
1584              merger.mergeCombiners(iterator) 
1585              return merger.iteritems() 
1586   
1587          return shuffled.mapPartitions(_mergeCombiners) 
1588   
1589 -    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): 
 1590          """ 
1591          Aggregate the values of each key, using given combine functions and a neutral 
1592          "zero value". This function can return a different result type, U, than the type 
1593          of the values in this RDD, V. Thus, we need one operation for merging a V into 
1594          a U and one operation for merging two U's, The former operation is used for merging 
1595          values within a partition, and the latter is used for merging values between 
1596          partitions. To avoid memory allocation, both of these functions are 
1597          allowed to modify and return their first argument instead of creating a new U. 
1598          """ 
1599          def createZero(): 
1600              return copy.deepcopy(zeroValue) 
 1601   
1602          return self.combineByKey( 
1603              lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) 
1604   
1605 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1606          """ 
1607          Merge the values for each key using an associative function "func" 
1608          and a neutral "zeroValue" which may be added to the result an 
1609          arbitrary number of times, and must not change the result 
1610          (e.g., 0 for addition, or 1 for multiplication.). 
1611   
1612          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1613          >>> from operator import add 
1614          >>> rdd.foldByKey(0, add).collect() 
1615          [('a', 2), ('b', 1)] 
1616          """ 
1617          def createZero(): 
1618              return copy.deepcopy(zeroValue) 
 1619   
1620          return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) 
1621   
1622       
1624          """ 
1625          Group the values for each key in the RDD into a single sequence. 
1626          Hash-partitions the resulting RDD with into numPartitions partitions. 
1627   
1628          Note: If you are grouping in order to perform an aggregation (such as a 
1629          sum or average) over each key, using reduceByKey will provide much 
1630          better performance. 
1631   
1632          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1633          >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 
1634          [('a', [1, 1]), ('b', [1])] 
1635          """ 
1636   
1637          def createCombiner(x): 
1638              return [x] 
 1639   
1640          def mergeValue(xs, x): 
1641              xs.append(x) 
1642              return xs 
1643   
1644          def mergeCombiners(a, b): 
1645              a.extend(b) 
1646              return a 
1647   
1648          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1649                                   numPartitions).mapValues(lambda x: ResultIterable(x)) 
1650   
1651       
1653          """ 
1654          Pass each value in the key-value pair RDD through a flatMap function 
1655          without changing the keys; this also retains the original RDD's 
1656          partitioning. 
1657   
1658          >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 
1659          >>> def f(x): return x 
1660          >>> x.flatMapValues(f).collect() 
1661          [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 
1662          """ 
1663          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1664          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1665   
1667          """ 
1668          Pass each value in the key-value pair RDD through a map function 
1669          without changing the keys; this also retains the original RDD's 
1670          partitioning. 
1671   
1672          >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 
1673          >>> def f(x): return len(x) 
1674          >>> x.mapValues(f).collect() 
1675          [('a', 3), ('b', 1)] 
1676          """ 
1677          map_values_fn = lambda (k, v): (k, f(v)) 
1678          return self.map(map_values_fn, preservesPartitioning=True) 
 1679   
1681          """ 
1682          Alias for cogroup but with support for multiple RDDs. 
1683   
1684          >>> w = sc.parallelize([("a", 5), ("b", 6)]) 
1685          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1686          >>> y = sc.parallelize([("a", 2)]) 
1687          >>> z = sc.parallelize([("b", 42)]) 
1688          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ 
1689                  sorted(list(w.groupWith(x, y, z).collect()))) 
1690          [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] 
1691   
1692          """ 
1693          return python_cogroup((self, other) + others, numPartitions=None) 
 1694   
1695       
1696 -    def cogroup(self, other, numPartitions=None): 
 1697          """ 
1698          For each key k in C{self} or C{other}, return a resulting RDD that 
1699          contains a tuple with the list of values for that key in C{self} as 
1700          well as C{other}. 
1701   
1702          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1703          >>> y = sc.parallelize([("a", 2)]) 
1704          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 
1705          [('a', ([1], [2])), ('b', ([4], []))] 
1706          """ 
1707          return python_cogroup((self, other), numPartitions) 
 1708   
1709 -    def sampleByKey(self, withReplacement, fractions, seed=None): 
 1710          """ 
1711          Return a subset of this RDD sampled by key (via stratified sampling). 
1712          Create a sample of this RDD using variable sampling rates for 
1713          different keys as specified by fractions, a key to sampling rate map. 
1714   
1715          >>> fractions = {"a": 0.2, "b": 0.1} 
1716          >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) 
1717          >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) 
1718          >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 
1719          True 
1720          >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 
1721          True 
1722          >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 
1723          True 
1724          """ 
1725          for fraction in fractions.values(): 
1726              assert fraction >= 0.0, "Negative fraction value: %s" % fraction 
1727          return self.mapPartitionsWithIndex( 
1728              RDDStratifiedSampler(withReplacement, fractions, seed).func, True) 
 1729   
1731          """ 
1732          Return each (key, value) pair in C{self} that has no pair with matching 
1733          key in C{other}. 
1734   
1735          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1736          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1737          >>> sorted(x.subtractByKey(y).collect()) 
1738          [('b', 4), ('b', 5)] 
1739          """ 
1740          def filter_func((key, vals)): 
1741              return len(vals[0]) > 0 and len(vals[1]) == 0 
 1742          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1743          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
1744   
1745 -    def subtract(self, other, numPartitions=None): 
 1746          """ 
1747          Return each value in C{self} that is not contained in C{other}. 
1748   
1749          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1750          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1751          >>> sorted(x.subtract(y).collect()) 
1752          [('a', 1), ('b', 4), ('b', 5)] 
1753          """ 
1754           
1755          rdd = other.map(lambda x: (x, True)) 
1756          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) 
 1757   
1759          """ 
1760          Creates tuples of the elements in this RDD by applying C{f}. 
1761   
1762          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1763          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1764          >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 
1765          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1766          """ 
1767          return self.map(lambda x: (f(x), x)) 
 1768   
1770          """ 
1771           Return a new RDD that has exactly numPartitions partitions. 
1772   
1773           Can increase or decrease the level of parallelism in this RDD. 
1774           Internally, this uses a shuffle to redistribute data. 
1775           If you are decreasing the number of partitions in this RDD, consider 
1776           using `coalesce`, which can avoid performing a shuffle. 
1777   
1778           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1779           >>> sorted(rdd.glom().collect()) 
1780           [[1], [2, 3], [4, 5], [6, 7]] 
1781           >>> len(rdd.repartition(2).glom().collect()) 
1782           2 
1783           >>> len(rdd.repartition(10).glom().collect()) 
1784           10 
1785          """ 
1786          jrdd = self._jrdd.repartition(numPartitions) 
1787          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1788   
1789 -    def coalesce(self, numPartitions, shuffle=False): 
 1790          """ 
1791          Return a new RDD that is reduced into `numPartitions` partitions. 
1792   
1793          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1794          [[1], [2, 3], [4, 5]] 
1795          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1796          [[1, 2, 3, 4, 5]] 
1797          """ 
1798          jrdd = self._jrdd.coalesce(numPartitions) 
1799          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1800   
1801 -    def zip(self, other): 
 1802          """ 
1803          Zips this RDD with another one, returning key-value pairs with the 
1804          first element in each RDD second element in each RDD, etc. Assumes 
1805          that the two RDDs have the same number of partitions and the same 
1806          number of elements in each partition (e.g. one was made through 
1807          a map on the other). 
1808   
1809          >>> x = sc.parallelize(range(0,5)) 
1810          >>> y = sc.parallelize(range(1000, 1005)) 
1811          >>> x.zip(y).collect() 
1812          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1813          """ 
1814          if self.getNumPartitions() != other.getNumPartitions(): 
1815              raise ValueError("Can only zip with RDD which has the same number of partitions") 
1816   
1817          def get_batch_size(ser): 
1818              if isinstance(ser, BatchedSerializer): 
1819                  return ser.batchSize 
1820              return 0 
 1821   
1822          def batch_as(rdd, batchSize): 
1823              ser = rdd._jrdd_deserializer 
1824              if isinstance(ser, BatchedSerializer): 
1825                  ser = ser.serializer 
1826              return rdd._reserialize(BatchedSerializer(ser, batchSize)) 
1827   
1828          my_batch = get_batch_size(self._jrdd_deserializer) 
1829          other_batch = get_batch_size(other._jrdd_deserializer) 
1830          if my_batch != other_batch: 
1831               
1832              if my_batch > other_batch: 
1833                  other = batch_as(other, my_batch) 
1834              else: 
1835                  self = batch_as(self, other_batch) 
1836   
1837           
1838           
1839          pairRDD = self._jrdd.zip(other._jrdd) 
1840          deserializer = PairDeserializer(self._jrdd_deserializer, 
1841                                          other._jrdd_deserializer) 
1842          return RDD(pairRDD, self.ctx, deserializer) 
1843   
1845          """ 
1846          Zips this RDD with its element indices. 
1847   
1848          The ordering is first based on the partition index and then the 
1849          ordering of items within each partition. So the first item in 
1850          the first partition gets index 0, and the last item in the last 
1851          partition receives the largest index. 
1852   
1853          This method needs to trigger a spark job when this RDD contains 
1854          more than one partitions. 
1855   
1856          >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 
1857          [('a', 0), ('b', 1), ('c', 2), ('d', 3)] 
1858          """ 
1859          starts = [0] 
1860          if self.getNumPartitions() > 1: 
1861              nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() 
1862              for i in range(len(nums) - 1): 
1863                  starts.append(starts[-1] + nums[i]) 
1864   
1865          def func(k, it): 
1866              for i, v in enumerate(it, starts[k]): 
1867                  yield v, i 
 1868   
1869          return self.mapPartitionsWithIndex(func) 
1870   
1872          """ 
1873          Zips this RDD with generated unique Long ids. 
1874   
1875          Items in the kth partition will get ids k, n+k, 2*n+k, ..., where 
1876          n is the number of partitions. So there may exist gaps, but this 
1877          method won't trigger a spark job, which is different from 
1878          L{zipWithIndex} 
1879   
1880          >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() 
1881          [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] 
1882          """ 
1883          n = self.getNumPartitions() 
1884   
1885          def func(k, it): 
1886              for i, v in enumerate(it): 
1887                  yield v, i * n + k 
 1888   
1889          return self.mapPartitionsWithIndex(func) 
1890   
1892          """ 
1893          Return the name of this RDD. 
1894          """ 
1895          name_ = self._jrdd.name() 
1896          if not name_: 
1897              return None 
1898          return name_.encode('utf-8') 
 1899   
1901          """ 
1902          Assign a name to this RDD. 
1903   
1904          >>> rdd1 = sc.parallelize([1,2]) 
1905          >>> rdd1.setName('RDD1') 
1906          >>> rdd1.name() 
1907          'RDD1' 
1908          """ 
1909          self._jrdd.setName(name) 
 1910   
1912          """ 
1913          A description of this RDD and its recursive dependencies for debugging. 
1914          """ 
1915          debug_string = self._jrdd.toDebugString() 
1916          if not debug_string: 
1917              return None 
1918          return debug_string.encode('utf-8') 
 1919   
1921          """ 
1922          Get the RDD's current storage level. 
1923   
1924          >>> rdd1 = sc.parallelize([1,2]) 
1925          >>> rdd1.getStorageLevel() 
1926          StorageLevel(False, False, False, False, 1) 
1927          >>> print(rdd1.getStorageLevel()) 
1928          Serialized 1x Replicated 
1929          """ 
1930          java_storage_level = self._jrdd.getStorageLevel() 
1931          storage_level = StorageLevel(java_storage_level.useDisk(), 
1932                                       java_storage_level.useMemory(), 
1933                                       java_storage_level.useOffHeap(), 
1934                                       java_storage_level.deserialized(), 
1935                                       java_storage_level.replication()) 
1936          return storage_level 
 1937   
1939          """ 
1940          Returns the default number of partitions to use during reduce tasks (e.g., groupBy). 
1941          If spark.default.parallelism is set, then we'll use the value from SparkContext 
1942          defaultParallelism, otherwise we'll use the number of partitions in this RDD. 
1943   
1944          This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce 
1945          the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will 
1946          be inherent. 
1947          """ 
1948          if self.ctx._conf.contains("spark.default.parallelism"): 
1949              return self.ctx.defaultParallelism 
1950          else: 
1951              return self.getNumPartitions() 
 1952   
1960   
1961      """ 
1962      Pipelined maps: 
1963   
1964      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1965      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1966      [4, 8, 12, 16] 
1967      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1968      [4, 8, 12, 16] 
1969   
1970      Pipelined reduces: 
1971      >>> from operator import add 
1972      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1973      20 
1974      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1975      20 
1976      """ 
1977   
1978 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1979          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1980               
1981              self.func = func 
1982              self.preservesPartitioning = preservesPartitioning 
1983              self._prev_jrdd = prev._jrdd 
1984              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1985          else: 
1986              prev_func = prev.func 
1987   
1988              def pipeline_func(split, iterator): 
1989                  return func(split, prev_func(split, iterator)) 
 1990              self.func = pipeline_func 
1991              self.preservesPartitioning = \ 
1992                  prev.preservesPartitioning and preservesPartitioning 
1993              self._prev_jrdd = prev._prev_jrdd   
1994              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1995          self.is_cached = False 
1996          self.is_checkpointed = False 
1997          self.ctx = prev.ctx 
1998          self.prev = prev 
1999          self._jrdd_val = None 
2000          self._jrdd_deserializer = self.ctx.serializer 
2001          self._bypass_serializer = False 
 2002   
2003      @property 
2005          if self._jrdd_val: 
2006              return self._jrdd_val 
2007          if self._bypass_serializer: 
2008              self._jrdd_deserializer = NoOpSerializer() 
2009          command = (self.func, self._prev_jrdd_deserializer, 
2010                     self._jrdd_deserializer) 
2011          ser = CloudPickleSerializer() 
2012          pickled_command = ser.dumps(command) 
2013          broadcast_vars = ListConverter().convert( 
2014              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
2015              self.ctx._gateway._gateway_client) 
2016          self.ctx._pickled_broadcast_vars.clear() 
2017          env = MapConverter().convert(self.ctx.environment, 
2018                                       self.ctx._gateway._gateway_client) 
2019          includes = ListConverter().convert(self.ctx._python_includes, 
2020                                             self.ctx._gateway._gateway_client) 
2021          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
2022                                               bytearray(pickled_command), 
2023                                               env, includes, self.preservesPartitioning, 
2024                                               self.ctx.pythonExec, 
2025                                               broadcast_vars, self.ctx._javaAccumulator) 
2026          self._jrdd_val = python_rdd.asJavaRDD() 
2027          return self._jrdd_val 
 2028   
2030          return not (self.is_cached or self.is_checkpointed) 
 2031   
2034      import doctest 
2035      from pyspark.context import SparkContext 
2036      globs = globals().copy() 
2037       
2038       
2039      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
2040      (failure_count, test_count) = doctest.testmod( 
2041          globs=globs, optionflags=doctest.ELLIPSIS) 
2042      globs['sc'].stop() 
2043      if failure_count: 
2044          exit(-1) 
 2045   
2046   
2047  if __name__ == "__main__": 
2048      _test() 
2049