In [1]:
#raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r".\\diabetes.csv")
df = spark.read.csv('bank.csv',inferSchema=True, header =True)
df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

In [2]:
df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columns
df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

In [3]:
print("total records",df.count())
# checking the distinct values in string columns of the dataframe using functional programming 
#[print("column name",df[t[0]].name,df.select(t[0]).distinct().show()) for t in df.dtypes if t[1]=='string' ]
#[print("column name",t[0],"distinct values") for t in df.dtypes if t[1]=='string' ]
for t in df.dtypes:
   if t[1]=='string':
     print("column name",t[0])
     print(df.select(t[0]).distinct().show())
total records 11162
column name job
+-------------+
|          job|
+-------------+
|       admin.|
|   technician|
|     services|
|   management|
|      retired|
|  blue-collar|
|   unemployed|
| entrepreneur|
|    housemaid|
|      unknown|
|self-employed|
|      student|
+-------------+

None
column name marital
+--------+
| marital|
+--------+
| married|
|  single|
|divorced|
+--------+

None
column name education
+---------+
|education|
+---------+
|secondary|
| tertiary|
|  primary|
|  unknown|
+---------+

None
column name default
+-------+
|default|
+-------+
|     no|
|    yes|
+-------+

None
column name housing
+-------+
|housing|
+-------+
|    yes|
|     no|
+-------+

None
column name loan
+----+
|loan|
+----+
|  no|
| yes|
+----+

None
column name contact
+---------+
|  contact|
+---------+
|  unknown|
| cellular|
|telephone|
+---------+

None
column name poutcome
+--------+
|poutcome|
+--------+
| unknown|
|   other|
| failure|
| success|
+--------+

None
column name deposit
+-------+
|deposit|
+-------+
|    yes|
|     no|
+-------+

None
In [4]:
# Now we will apply string indexing to categorial variables.
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
indexers = [StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"_index").fit(df) for categoricalCol in categoricalColumns  ]
#display(indexers)
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()
+---+-----------+--------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+-------+---------+-------------+---------------+-------------+-------------+----------+-------------+--------------+
|age|        job| marital|education|default|balance|housing|loan|contact|duration|campaign|pdays|previous|poutcome|deposit|job_index|marital_index|education_index|default_index|housing_index|loan_index|contact_index|poutcome_index|
+---+-----------+--------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+-------+---------+-------------+---------------+-------------+-------------+----------+-------------+--------------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|    1042|       1|   -1|       0| unknown|    yes|      3.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 56|     admin.| married|secondary|     no|     45|     no|  no|unknown|    1467|       1|   -1|       0| unknown|    yes|      3.0|          0.0|            0.0|          0.0|          0.0|       0.0|          1.0|           0.0|
| 41| technician| married|secondary|     no|   1270|    yes|  no|unknown|    1389|       1|   -1|       0| unknown|    yes|      2.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 55|   services| married|secondary|     no|   2476|    yes|  no|unknown|     579|       1|   -1|       0| unknown|    yes|      4.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 54|     admin.| married| tertiary|     no|    184|     no|  no|unknown|     673|       2|   -1|       0| unknown|    yes|      3.0|          0.0|            1.0|          0.0|          0.0|       0.0|          1.0|           0.0|
| 42| management|  single| tertiary|     no|      0|    yes| yes|unknown|     562|       2|   -1|       0| unknown|    yes|      0.0|          1.0|            1.0|          0.0|          1.0|       1.0|          1.0|           0.0|
| 56| management| married| tertiary|     no|    830|    yes| yes|unknown|    1201|       1|   -1|       0| unknown|    yes|      0.0|          0.0|            1.0|          0.0|          1.0|       1.0|          1.0|           0.0|
| 60|    retired|divorced|secondary|     no|    545|    yes|  no|unknown|    1030|       1|   -1|       0| unknown|    yes|      5.0|          2.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 37| technician| married|secondary|     no|      1|    yes|  no|unknown|     608|       1|   -1|       0| unknown|    yes|      2.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 28|   services|  single|secondary|     no|   5090|    yes|  no|unknown|    1297|       3|   -1|       0| unknown|    yes|      4.0|          1.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 38|     admin.|  single|secondary|     no|    100|    yes|  no|unknown|     786|       1|   -1|       0| unknown|    yes|      3.0|          1.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 30|blue-collar| married|secondary|     no|    309|    yes|  no|unknown|    1574|       2|   -1|       0| unknown|    yes|      1.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 29| management| married| tertiary|     no|    199|    yes| yes|unknown|    1689|       4|   -1|       0| unknown|    yes|      0.0|          0.0|            1.0|          0.0|          1.0|       1.0|          1.0|           0.0|
| 46|blue-collar|  single| tertiary|     no|    460|    yes|  no|unknown|    1102|       2|   -1|       0| unknown|    yes|      1.0|          1.0|            1.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 31| technician|  single| tertiary|     no|    703|    yes|  no|unknown|     943|       2|   -1|       0| unknown|    yes|      2.0|          1.0|            1.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 35| management|divorced| tertiary|     no|   3837|    yes|  no|unknown|    1084|       1|   -1|       0| unknown|    yes|      0.0|          2.0|            1.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 32|blue-collar|  single|  primary|     no|    611|    yes|  no|unknown|     541|       3|   -1|       0| unknown|    yes|      1.0|          1.0|            2.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 49|   services| married|secondary|     no|     -8|    yes|  no|unknown|    1119|       1|   -1|       0| unknown|    yes|      4.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 41|     admin.| married|secondary|     no|     55|    yes|  no|unknown|    1120|       2|   -1|       0| unknown|    yes|      3.0|          0.0|            0.0|          0.0|          1.0|       0.0|          1.0|           0.0|
| 49|     admin.|divorced|secondary|     no|    168|    yes| yes|unknown|     513|       1|   -1|       0| unknown|    yes|      3.0|          2.0|            0.0|          0.0|          1.0|       1.0|          1.0|           0.0|
+---+-----------+--------+---------+-------+-------+-------+----+-------+--------+--------+-----+--------+--------+-------+---------+-------------+---------------+-------------+-------------+----------+-------------+--------------+
only showing top 20 rows

In [5]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()
root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

In [6]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()
Out[6]:
0 1 2 3 4
label 1 1 1 1 1
features (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
age 59 56 41 55 54
job admin. admin. technician services admin.
marital married married married married married
education secondary secondary secondary secondary tertiary
default no no no no no
balance 2343 45 1270 2476 184
housing yes no yes yes no
loan no no no no no
contact unknown unknown unknown unknown unknown
duration 1042 1467 1389 579 673
campaign 1 1 1 1 2
pdays -1 -1 -1 -1 -1
previous 0 0 0 0 0
poutcome unknown unknown unknown unknown unknown
deposit yes yes yes yes yes
In [7]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
Training Dataset Count: 7855
Test Dataset Count: 3307
In [8]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
+---+----------+-----+--------------+----------+--------------------+
|age|       job|label| rawPrediction|prediction|         probability|
+---+----------+-----+--------------+----------+--------------------+
| 33|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 49|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 52|management|  0.0|[520.0,1931.0]|       1.0|[0.21215830273357...|
| 53|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 58|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 32|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 57|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 52|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 46|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
| 31|management|  0.0|[2498.0,481.0]|       0.0|[0.83853642161799...|
+---+----------+-----+--------------+----------+--------------------+
only showing top 10 rows

In [9]:
print(dtModel.toDebugString)
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_d200084c2d5a, depth=3, numNodes=9, numClasses=2, numFeatures=30
  If (feature 26 <= 206.5)
   If (feature 23 in {1.0})
    Predict: 1.0
   Else (feature 23 not in {1.0})
    Predict: 0.0
  Else (feature 26 > 206.5)
   If (feature 26 <= 405.5)
    If (feature 20 in {1.0})
     Predict: 0.0
    Else (feature 20 not in {1.0})
     Predict: 1.0
   Else (feature 26 > 405.5)
    Predict: 1.0

In [10]:
#Evaluate our Decision Tree model.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
Test Area Under ROC: 0.7808118726917547
In [11]:
#RandomForest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('age', 'job', 'label', 'rawPrediction', 'prediction', 'probability').show(10)
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
+---+----------+-----+--------------------+----------+--------------------+
|age|       job|label|       rawPrediction|prediction|         probability|
+---+----------+-----+--------------------+----------+--------------------+
| 33|management|  0.0|[15.1502363257935...|       0.0|[0.75751181628967...|
| 49|management|  0.0|[15.0268823941769...|       0.0|[0.75134411970884...|
| 52|management|  0.0|[6.45157175624421...|       1.0|[0.32257858781221...|
| 53|management|  0.0|[14.2305502576427...|       0.0|[0.71152751288213...|
| 58|management|  0.0|[15.6248584129298...|       0.0|[0.78124292064649...|
| 32|management|  0.0|[15.4925318353978...|       0.0|[0.77462659176989...|
| 57|management|  0.0|[14.8285262763956...|       0.0|[0.74142631381978...|
| 52|management|  0.0|[17.1115052569861...|       0.0|[0.85557526284930...|
| 46|management|  0.0|[17.1263883107242...|       0.0|[0.85631941553621...|
| 31|management|  0.0|[15.8795209580104...|       0.0|[0.79397604790052...|
+---+----------+-----+--------------------+----------+--------------------+
only showing top 10 rows

Test Area Under ROC: 0.8807102073869716
In [ ]: