1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  """ 
 19  Python package for random data generation. 
 20  """ 
 21   
 22   
 23  from pyspark.rdd import RDD 
 24  from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector 
 25  from pyspark.serializers import NoOpSerializer 
 29      """ 
 30      Generator methods for creating RDDs comprised of i.i.d samples from 
 31      some distribution. 
 32      """ 
 33   
 34      @staticmethod 
 59   
 60      @staticmethod 
 61 -    def normalRDD(sc, size, numPartitions=None, seed=None): 
  62          """ 
 63          Generates an RDD comprised of i.i.d. samples from the standard normal 
 64          distribution. 
 65   
 66          To transform the distribution in the generated RDD from standard normal 
 67          to some other normal N(mean, sigma^2), use 
 68          C{RandomRDDs.normal(sc, n, p, seed)\ 
 69            .map(lambda v: mean + sigma * v)} 
 70   
 71          >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L) 
 72          >>> stats = x.stats() 
 73          >>> stats.count() 
 74          1000L 
 75          >>> abs(stats.mean() - 0.0) < 0.1 
 76          True 
 77          >>> abs(stats.stdev() - 1.0) < 0.1 
 78          True 
 79          """ 
 80          jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) 
 81          normal = RDD(jrdd, sc, NoOpSerializer()) 
 82          return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) 
  83   
 84      @staticmethod 
 85 -    def poissonRDD(sc, mean, size, numPartitions=None, seed=None): 
  86          """ 
 87          Generates an RDD comprised of i.i.d. samples from the Poisson 
 88          distribution with the input mean. 
 89   
 90          >>> mean = 100.0 
 91          >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=1L) 
 92          >>> stats = x.stats() 
 93          >>> stats.count() 
 94          1000L 
 95          >>> abs(stats.mean() - mean) < 0.5 
 96          True 
 97          >>> from math import sqrt 
 98          >>> abs(stats.stdev() - sqrt(mean)) < 0.5 
 99          True 
100          """ 
101          jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) 
102          poisson = RDD(jrdd, sc, NoOpSerializer()) 
103          return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) 
 104   
105      @staticmethod 
124   
125      @staticmethod 
127          """ 
128          Generates an RDD comprised of vectors containing i.i.d. samples drawn 
129          from the standard normal distribution. 
130   
131          >>> import numpy as np 
132          >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect()) 
133          >>> mat.shape 
134          (100, 100) 
135          >>> abs(mat.mean() - 0.0) < 0.1 
136          True 
137          >>> abs(mat.std() - 1.0) < 0.1 
138          True 
139          """ 
140          jrdd = sc._jvm.PythonMLLibAPI() \ 
141              .normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed) 
142          normal = RDD(jrdd, sc, NoOpSerializer()) 
143          return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) 
 144   
145      @staticmethod 
146 -    def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None): 
 147          """ 
148          Generates an RDD comprised of vectors containing i.i.d. samples drawn 
149          from the Poisson distribution with the input mean. 
150   
151          >>> import numpy as np 
152          >>> mean = 100.0 
153          >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L) 
154          >>> mat = np.mat(rdd.collect()) 
155          >>> mat.shape 
156          (100, 100) 
157          >>> abs(mat.mean() - mean) < 0.5 
158          True 
159          >>> from math import sqrt 
160          >>> abs(mat.std() - sqrt(mean)) < 0.5 
161          True 
162          """ 
163          jrdd = sc._jvm.PythonMLLibAPI() \ 
164              .poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed) 
165          poisson = RDD(jrdd, sc, NoOpSerializer()) 
166          return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes))) 
  167   
170      import doctest 
171      from pyspark.context import SparkContext 
172      globs = globals().copy() 
173       
174       
175      globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) 
176      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
177      globs['sc'].stop() 
178      if failure_count: 
179          exit(-1) 
 180   
181   
182  if __name__ == "__main__": 
183      _test() 
184