2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > 《大数据+AI在大健康领域中最佳实践前瞻》---- 基于 pyspark + xgboost 算法的 欺诈检测 DEMO实践

《大数据+AI在大健康领域中最佳实践前瞻》---- 基于 pyspark + xgboost 算法的 欺诈检测 DEMO实践

时间:2022-08-25 05:00:31

相关推荐

《大数据+AI在大健康领域中最佳实践前瞻》---- 基于 pyspark + xgboost 算法的 欺诈检测 DEMO实践

文章大纲

欺诈检测一般性处理流程介绍pyspark + xgboost DEMO参考文献

xgboost 和pyspark 如何配置呢? 请参考之前的博文:

使用 WSL 进行pyspark + xgboost 分类+特征重要性 简单实践

银行需要面对数量不断上升的欺诈案件。随着新技术的出现,欺诈事件的实例将会成倍增加,银行很难检查每笔交易并手动识别欺诈模式。RPA使用“if-then”方法识别潜在的欺诈行为并将其标记给相关部门。例如,如果在短时间内进行了多次交易, RPA会识别该账户并将其标记为潜在威胁。这有助于银行仔细审查账户并调查欺诈行为。

欺诈检测一般性处理流程介绍

流程图说明

正如我们在上面看到的,我们接收我们的输入,包括关于金融数据中个人保险索赔的数据(这些包含索赔特征、客户特征和保险特征)。

经过一些预处理和添加新的特征,我们使用数据来训练XGBOOST分类器。

在分类器被训练之后,它可以用来确定新记录是否被接受(不欺诈)或被拒绝(欺诈)。

下面将更详细地描述该过程的流程。

当我们和客户交流后,需要针对每个字段进行理解,客户会给到我们一个数据说明表格:

输入

Our input consists of a dataset with lines for each claim. The claims contain data about the customers, the types of claims, claim amounts, and other relevant features.

Data Preparation (Preprocessing, Generation of Code Features, and Generation of Customer Segmentation Features)

We first do some initial preprocessing to convert the data fields to a suitable format. Then, based on the input, we generate features which characterize the customer based on factors like number of previous claims, number of previous occurences of fraud, total amount claimed, etc. These customer segmentation features are added to the existing dataset along with features that detail the presence (or lack thereof) of Warning Codes, Diagnosis Codes, etc.

我们首先做一些初始的预处理,将数据字段转换成合适的格式。然后,基于输入,我们生成特征,这些特征基于以前索赔次数、以前欺诈发生次数、索赔总额等因素来描述客户。这些客户细分特征与详细说明警告代码存在(或缺乏)的特征一起添加到现有数据集中,诊断代码等。

Preprocessing consists of :

Log Transformation on high magnitude numerical featuresOne-hot encoding of categorical featuresTimestamp transforms on date-time features

Customer Segmentation Features in the data include :

Summation of ORG_PRES_AMT_VALUE for a particular customer until a particular timestamp

Summation of TOTAL_RECEIPT_AMT for a particular customer until a particular timestamp

Summation of APP_AMT

Summation of CL_SOCIAL_PAY_AMT for a particular customer until a particular timestamp

Summation of CL_OWNER_PAY_AMT for a particular customer until a particular timestamp

Summation of REJECTED_AMT

Average of COPAY_PCT for a particular customer until a particular timestamp

Max value of NO_OF_YEAR for a particular customer until a particular timestamp

Number of CL_LINE_STATUS_RJ lines

Number of CL_LINE_STATUS_AC lines

Ratio of CL_LINE_STATUS_RJ lines to CL_LINE_STATUS_AC lines

Number of BEN_SPEND_Applicant lines

Number of BEN_SPEND_Spouse lines

Number of BEN_SPEND_Child lines

Number of BEN_SPEND_Parents lines

Number of lines

Ratio of Total Rejected Amount to Total ORG_PRES_AMT_VALUE

Ratio of CL_LINE_STATUS_RJ lines to all lines

Ratio of CL_LINE_STATUS_AC lines to all lines

Ratio of BEN_SPEND_Applicant lines to all lines

Ratio of BEN_SPEND_Spouse lines to all lines

Ratio of BEN_SPEND_Child lines to all lines

Ratio of BEN_SPEND_Parents lines to all lines

Code Features are generated by selecting a particular code field, determining the top 20 most frequently occuring items in that particular field, and then creating a feature for each of those 20 items, indicating whether they are present in the record or not. The code fields for which we generate Code

Features in the data are :

Warning CodeDIAG_CODEPROV_CODEKIND_CODECLSH_HOSP_CODE

Classification Model :

XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. The implementation of the algorithm was engineered for efficiency of compute time and memory resources. A design goal was to make the best use of available resources to train the model. We are using an XGBoost classifier to determine whether a claim is fraudulent or not.

XGBoost是一个梯度增强决策树的实现,旨在提高速度和性能。算法的实现是为了提高计算时间和内存资源的效率而设计的。设计目标是充分利用现有资源来训练模型。我们使用XGBoost分类器来确定索赔是否具有欺诈性。

输出

The model classifies the claims as Rejected or Accepted, ie, Fraudulent or Not Fraudulent

pyspark + xgboost DEMO

# Imports and Initializationfrom xgboost import XGBClassifier, plot_importancefrom sklearn.model_selection import RandomizedSearchCVfrom pyspark.sql import functions as Ffrom pyspark.sql.types import *from sklearn import preprocessingimport sysimport osimport numpy as npimport pandas as pdfrom sklearn.metrics import precision_recall_curvefrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import average_precision_score, confusion_matrix, accuracy_scorefrom pyspark.sql import SparkSessionfrom pyspark import SparkConffrom pyspark.sql.types import *from pyspark.sql import functions as Ffrom pyspark.storagelevel import StorageLevelimport jsonimport mathimport numbersfrom pyspark.sql import SQLContextfrom pyspark.sql import Windowimport matplotlib.pyplot as pltimport itertoolsfrom sklearn.metrics import roc_curvefrom sklearn.metrics import roc_auc_scoreday_of_week_udf = F.udf(lambda ts: ts.weekday() if ts is not None else None,StringType())def getSummary(df):summarydf = (df.withColumn('is_BEN_TYPE_Applicant',F.when(F.col("BEN_TYPE") == "Applicant", F.lit(1)).otherwise(F.lit(0))).groupby("MBR_NO").agg(F.max("NO_OF_YR").alias("MAX_NO_OF_YR"),F.sum(F.lit(1)).alias("NUM_LINES"),).withColumn("FRAC_REJECTED_AMT", F.col("TOT_REJECTED_AMT")/F.col("TOT_ORG_PRES_AMT_VALUE")).withColumn("FRAC_BEN_TYPE_Applicant", F.col("TOT_is_BEN_TYPE_Applicant")/F.col("NUM_LINES")).withColumn("FRAC_BEN_TYPE_Spouse", F.col("TOT_is_BEN_TYPE_Spouse")/F.col("NUM_LINES")).withColumn("FRAC_BEN_TYPE_Child", F.col("TOT_is_BEN_TYPE_Child")/F.col("NUM_LINES")).withColumn("FRAC_BEN_TYPE_Parent", F.col("TOT_is_BEN_TYPE_Parent")/F.col("NUM_LINES")).persist())return summarydfdef plot_confusion_matrix(cm, classes,normalize=False,title='Confusion matrix',cmap=plt.cm.Blues):"""This function prints and plots the confusion matrix.Normalization can be applied by setting `normalize=True`."""if normalize:cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]print("Normalized confusion matrix")else:print('Confusion matrix, without normalization')print(cm)plt.imshow(cm, interpolation='nearest', cmap=cmap)plt.title(title)plt.colorbar()tick_marks = np.arange(len(classes))plt.xticks(tick_marks, classes, rotation=45)plt.yticks(tick_marks, classes)fmt = '.2f' if normalize else 'd'thresh = cm.max() / 2.for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):plt.text(j, i, format(cm[i, j], fmt),horizontalalignment="center",color="white" if cm[i, j] > thresh else "black")plt.tight_layout()plt.ylabel('True label')plt.xlabel('Predicted label')def checkContain(baseFeatures, allFeatures, transformsList):"""Description : Used to indicate that we want to use the transformed featuresand not the original features if a transform has been doneInput : baseFeatures - Features included in the dataset before anytransforms are appliedallFeatures - All the features present in the dataframe after alltransforms and prep have completedtransformsList - The features from the original dataset that shouldhave transforms applied to themOutput : List of the features we're going to use for the ML model"""resList = []for baseFeat in baseFeatures:if baseFeat not in transformsList:resList.append(baseFeat)else:for feat in allFeatures:if baseFeat in feat:if "~~" in feat or "log10" in feat:resList.append(feat)return resListdef transform_ts_fields(df, ts_cols):"""Description : Produces a timestamp in the standard dow-hod format for thesupplied fieldInput : df - dataframets_cols - timestamp features that need to be formatted correctlyOutput : dataframe with appropriately formatted timestamp features"""col_list = df.columnsfor col in ts_cols:if(col in col_list):df = (df .withColumn(col,F.col(col).cast("timestamp")) .withColumn("{}_dow".format(col),day_of_week_udf(F.col(col))) .withColumn("{}_hod".format(col),F.hour(F.col(col))))return dfdef transform_numeric_fields(df, num_cols):"""Description : Converts all numeric fields into float typeInput : df - dataframenum_cols - numeric features that need to be converted to float typeOutput : dataframe with appropriately numerical features converted to floattype"""col_list = df.columnsfor col in num_cols:if(col in col_list):df = (df.withColumn(col, F.col(col).cast("float")))return dfdef transform_log_fields(df, num_cols):"""Description : Produces the log_10 of the fields passed to itInput : df - dataframenum_cols - numeric features whose log values need to be calculatedOutput : dataframe with added log values for the required numericalfeatures"""col_list = df.columnsfor col in num_cols:if(col in col_list):df = (df.withColumn(col + "_log10", F.log(10.0, F.col(col))))return dfdef with_transform(df, param_dict):"""Description : Applies transforms on relevant data fields in the dataInput : df - dataframeparam_dict - parameter dictionaryOutput : dataframe with all appropriate transforms"""df = transform_ts_fields(df, param_dict['BASE_FEATURES_TIMESTAMP'])df = transform_numeric_fields(df, param_dict['BASE_FEATURES_NUMERIC'])df = transform_log_fields(df, param_dict['LOG_TRANSFORM_FEATURES'])df = (df .withColumn("INCUR_PERIOD_SECS",F.col("INCUR_DATE_TO").cast("long") -F.col("INCUR_DATE_FROM").cast("long")))return dfdef run_xgboost(data,feats, scale_pos_weight=1.0, old_model = None):"""Description : Generates an xgboost model based on training dataInput : X_train_pd - Pandas Dataframe, training data inputy_train - training data output/labelsparam_dict - parameter dictionarymax_depth_list - list of max depths of treesn_estimators_list - list of number of treesscoring_metric - scoring metric usedgrid_scoring - grid scoring metricscale_pos_weight - weight applied to positive valsnum_cv = cross-validation splitting strategyOutput : Trained XGBoost Classifier"""X_train, X_test, y_train, y_test = train_test_split(data[feats], data['label'], test_size=0.33)unique, counts = np.unique(y_train, return_counts=True)cdict = dict(zip(unique, counts))temp_pos_weight = cdict[0]/cdict[1]xgb_class = XGBClassifier(scale_pos_weight=temp_pos_weight)xgb_class.fit(X=X_train, y=y_train, xgb_model = old_model)y_pred_proba = xgb_class.predict_proba(X_test)threshs = np.arange(0.01,1,0.01)acc = 0prsum = 0abdist = 1bestthresh = 0for thresh in threshs:y_pred_temp = (y_pred_proba[:,1] >= thresh).astype(int)'''precision, recall, thresholds = precision_recall_curve(y_test, y_pred_temp)average_precision = average_precision_score(y_test, y_pred_temp)if ((precision[1]+recall[1])>prsum) and (recall[1]>precision[1]):prsum = precision[1]+recall[1]bestthresh = thresh''''''temp_acc = accuracy_score(np.array(y_test), np.array(y_pred_temp))if temp_acc >acc:acc = temp_accbestthresh = thresh'''cnf_matrix_temp = confusion_matrix(y_test, y_pred_temp)cm = cnf_matrix_temp.astype('float') / cnf_matrix_temp.sum(axis=1)[:, np.newaxis]fp = cm[0][1] * 1.0fn = cm[1][0] * 1.0dist = abs((fn/fp)-1)if dist<abdist:abdist = distbestthresh = threshy_pred = (y_pred_proba[:,1] >= bestthresh).astype(int)precision, recall, thresholds = precision_recall_curve(y_test, y_pred)average_precision = average_precision_score(y_test, y_pred)# Compute confusion matrixcnf_matrix = confusion_matrix(y_test, y_pred)np.set_printoptions(precision=2)# Plot non-normalized confusion matrixplt.figure()plot_confusion_matrix(cnf_matrix, classes=[0,1],title='Confusion matrix, without normalization')# Plot normalized confusion matrixplt.figure()plot_confusion_matrix(cnf_matrix, classes=[0,1], normalize=True,title='Normalized confusion matrix')plt.show()plt.step(recall, precision, color='b', alpha=0.2,where='post')plt.fill_between(recall, precision, step='post', alpha=0.2,color='b')plt.xlabel('Recall')plt.ylabel('Precision')plt.ylim([0.0, 1.05])plt.xlim([0.0, 1.0])plt.title('2-class Precision-Recall curve: AP={0:0.5f}'.format(average_precision))plt.show()auc = roc_auc_score(y_test, y_pred_proba[:,1])print('AUC: %.3f' % auc)# calculate roc curvefpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:,1])# plot no skillplt.plot([0, 1], [0, 1], linestyle='--')# plot the roc curve for the modelplt.plot(fpr, tpr, marker='.')# show the plotplt.show()unique, counts = np.unique(data['label'], return_counts=True)cdict = dict(zip(unique, counts))pos_weight = cdict[0]/cdict[1]full_model = XGBClassifier(scale_pos_weight= pos_weight)full_model.fit(data[feats], data['label'])return full_model, bestthreshdef setup_spark_session(param_dict):"""Description : Used to setup spark sessionInput : param_dict - parameter dictionaryOutput : Spark Session, Spark Context, and SQL Context"""pd.set_option('display.max_rows', 500)pd.set_option('display.max_columns', 500)os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"try:spark.stop()print("Stopped a SparkSession")except Exception as e:print("No existing SparkSession")SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"] # "10G"SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"] # "5"SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"] # "3G"SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"] # "1"AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"] AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]conf = SparkConf().\setAppName(param_dict["APP_NAME"]).\setMaster('yarn-client').\set('spark.executor.cores', SPARK_EXECUTOR_CORE).\set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).\set('spark.driver.cores', SPARK_DRIVER_CORE).\set('spark.driver.memory', SPARK_DRIVER_MEMORY).\set('spark.driver.maxResultSize', '0')spark = SparkSession.builder.\config(conf=conf).\getOrCreate()sc = spark.sparkContexthadoop_conf = sc._jsc.hadoopConfiguration()hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")sqlContext = SQLContext(sc)return spark, sc, sqlContextdef loadDataset(vw_cl_lines_df, datefield, param_dict):"""Description : Runs data through appropriate transforms to convert itto a suitable formatInput : vw_cl_lines_df - input dataframedatefield - field used to establish a window for the addFeatsfunctionparam_dict - parameter dictionaryOutput : Properly formatted dataframe"""vw_cl_lines_df = (with_transform(vw_cl_lines_df, param_dict))vw_cl_lines_df = vw_cl_lines_df.withColumn(datefield + "_unix",(F.unix_timestamp(F.col(datefield),format='yyyy-MM-dd HH:mm:ss.000')))return vw_cl_lines_dfdef addOneHotsTest(df, oneHots):for item in oneHots:field = item.split('~')[0]df[item] = np.where(df[field] == item, 1, 0)return dfdef addCodes(res_df, codeField, topCodes, optUDF, knownPref):# Need to get same codes for testingfor code in topCodes:likeCode = "%" + code + "%"res_df = res_df.withColumn(code,F.when(res_df[codeField].like(likeCode),1).otherwise(0))def checkOtherCodes(x):if not x:return 0x = set(x)if x.issubset(topCodes):return 0else:return 1otherCodesUDF = F.udf(checkOtherCodes, IntegerType())if knownPref is not None:otherlabel = knownPref + "_" + codeFieldelse:otherlabel = codeFieldres_df = res_df.withColumn("OTHER_" + otherlabel,otherCodesUDF(res_df[codeField]))codesAdded = topCodesreturn res_df, codesAddeddef codeExtract(df, codeField, topCount, optUDF=None, knownPref=None):"""Description : Function to extract code featuresInput : df - input dataframecodeField - field used to establish a window for the addFeatsfunctiontopCount - number of code features to be addedoptUDF - optional udf to apply to the fieldknownPref - prefix characterizing a field, if anyOutput : dataframe with code features added"""codeEx_df = dfif optUDF is not None:codeEx_df = codeEx_df.withColumn(codeField, optUDF(codeEx_df[codeField]))codeEx_df = codeEx_df.withColumn(codeField, F.explode(F.split(codeEx_df[codeField], ",")))code_counts = codeEx_df.groupBy(codeField).count().sort(F.desc("count"))if knownPref is not None:code_counts = code_counts.filter(code_counts[codeField].like("%" + knownPref + "%"))# code_counts.show(10)xy = code_counts.toPandas()# Generating a list of the top 20 most frequently occuring Reject CodestopCodes = xy[codeField].head(topCount).tolist()topCodes = [x.strip() for x in topCodes]res_df = dfreturn addCodes(res_df, codeField, topCodes, optUDF, knownPref)# checks for presence of values in a fielddef isVal(df, field, value):return df.withColumn("is_" + field + "_" + value,F.when(F.col(field) == value,F.lit(1)).otherwise(F.lit(0)))# sums values of a field within a specified windowdef sumVal(df, field, windowval):return df.withColumn("TOT_" + field, F.sum(field).over(windowval))# finds the maximum value of a field within a specified windowdef maxVal(df, field, windowval):return df.withColumn("MAX_" + field, F.max(field).over(windowval))# finds the average value of a field within a specified windowdef meanVal(df, field, windowval):return df.withColumn("MEAN_" + field, F.mean(field).over(windowval))# finds the ratio between two fields of a recorddef fracVal(df, numfield, denomfield, fracName):return df.withColumn(fracName, F.col(numfield) / F.col(denomfield))# adds required fields to the dataframedef addFeatsTrain(vw_cl_lines_df, param_dict):orig = vw_cl_lines_dfwindowval = (Window.partitionBy(param_dict["groupField"]).orderBy(param_dict["windowField"] + "_unix").rangeBetween(Window.unboundedPreceding, -1))codes_df = orig.withColumn("NUM_LINES", F.sum(F.lit(1)).over(windowval))for field in param_dict["isFields"]:codes_df = isVal(codes_df, field[0], field[1])for field in param_dict["sumFields"]:codes_df = sumVal(codes_df, field, windowval)for field in param_dict["maxFields"]:codes_df = maxVal(codes_df, field, windowval)for field in param_dict["meanFields"]:codes_df = meanVal(codes_df, field, windowval)for fracTuple in param_dict["fracTuples"]:codes_df = fracVal(codes_df, fracTuple[0], fracTuple[1], fracTuple[2])def remPref(x):if x is None:return ""x = x.split(",")y = []for item in x:if (('T' not in item) & ('M' not in item)):y.append(item.strip())y = ','.join(y)return yremPrefUDF = F.udf(remPref, StringType())allCodes = {}for code in param_dict["codeFields"]:if len(code) == 1:codes_df, toAdd = codeExtract(codes_df, code[0], 20)if code[0] in allCodes:allCodes[code[0]] = allCodes[code[0]] + toAddelse:allCodes[code[0]] = toAddelse:codes_df, toAdd = codeExtract(codes_df,code[0],20,optUDF=remPrefUDF,knownPref=code[1])if code[0] in allCodes:allCodes[code[0]] = allCodes[code[0]] + toAddelse:allCodes[code[0]] = toAddaddedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))return codes_df, addedCols, allCodesdef addFeatsTest(vw_cl_lines_df, param_dict, summary_df):orig = vw_cl_lines_dfjoinfields = [param_dict['groupField'], "NUM_LINES"]for field in param_dict["sumFields"]:joinfields.append("TOT_"+field)for field in param_dict["maxFields"]:joinfields.append("MAX_"+field)for field in param_dict["meanFields"]:joinfields.append("MEAN_"+field)for fracTuple in param_dict["fracTuples"]:joinfields.append(fracTuple[2])codes_df = orig.join(summary_df[joinfields], param_dict['groupField'],how='left')for field in param_dict["isFields"]:codes_df = isVal(codes_df, field[0], field[1])def remPref(x):if x is None:return ""x = x.split(",")y = []for item in x:if (('T' not in item) & ('M' not in item)):y.append(item.strip())y = ','.join(y)return yremPrefUDF = F.udf(remPref, StringType())allCodes = {}for code in param_dict["codeFields"]:presentInTrain = param_dict["allCodes"][code[0]]if len(code) == 1:codes_df, added = addCodes(codes_df, code[0], presentInTrain, None,None)else:codes_df, added = addCodes(codes_df, code[0], presentInTrain,optUDF=remPrefUDF, knownPref=code[1])addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))return codes_df, addedCols# prepares the data for use in a training or inference by adding features# and appropriate labelsdef prepTrainData(df, baseFeatures, param_dict):trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)negCount = trainData.filter(trainData[param_dict["labelField"]] ==param_dict["negativeLabel"]).count()posCount = trainData.filter(trainData[param_dict["labelField"]] ==param_dict["positiveLabel"]).count()pos_weight = negCount/posCounttrainData, extraCols, param_dict["allCodes"] = addFeatsTrain(trainData,param_dict)vw_cl_lines_pd = trainData.toPandas()prep_labelled_data_pd = pd.get_dummies(vw_cl_lines_pd,columns=param_dict["BASE_FEATURES_CATEGORICAL"],drop_first=False,prefix_sep="~~")featureCols = extraCols + checkContain(baseFeatures,prep_labelled_data_pd.columns.tolist(),param_dict["LOG_TRANSFORM_FEATURES"] +param_dict["BASE_FEATURES_CATEGORICAL"])param_dict["oneHots"] = [x for x in prep_labelled_data_pd.columns.tolist()if "~~" in x]leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x inparam_dict["isFields"] if x[0] == param_dict["labelField"]]featureCols = [x for x in featureCols if x not in leakageFeats]return prep_labelled_data_pd, featureCols, pos_weight, param_dictdef prepTestData(df, summary, baseFeatures, param_dict):trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)trainData, extraCols = addFeatsTest(trainData, param_dict , summary)vw_cl_lines_pd = trainData.toPandas()prep_labelled_data_pd = addOneHotsTest(vw_cl_lines_pd,param_dict["oneHots"])featureCols = extraCols + checkContain(baseFeatures,prep_labelled_data_pd.columns.tolist(),param_dict["LOG_TRANSFORM_FEATURES"] +param_dict["BASE_FEATURES_CATEGORICAL"])leakageFeats = ["is_"+str(x[0])+"_"+str(x[1]) for x inparam_dict["isFields"] if x[0] == param_dict["labelField"]]featureCols = [x for x in featureCols if x not in leakageFeats]return prep_labelled_data_pd, featureCols# trains and returns an XGBoost Classifierdef trainXGBModel(df, param_dict): # ,onlyWarn = False):pdf, feats, ratio, param_dict = prepTrainData(df, param_dict["baseFeatures"], param_dict)for col in param_dict["BASE_FEATURES_TIMESTAMP"]:pdf[col] = pd.to_datetime(pdf[col], errors='coerce')adf = pdf.replace([np.inf,-np.inf], 0)cols = pdf[feats].columnslabel = np.where(adf[param_dict["labelField"]] ==param_dict["positiveLabel"], 1, 0)x = adf[feats].values #returns a numpy arraystandard_scaler = preprocessing.StandardScaler()x_scaled = standard_scaler.fit_transform(x)adf = pd.DataFrame(x_scaled, columns=adf[feats].columns)adf['label'] = label#X_train, y_train = adf[feats], adf['label']xgb_model, bestThresh = run_xgboost(adf[feats + ['label']], feats, scale_pos_weight= ratio)param_dict["trainedCols"] = list(feats)return xgb_model, feats, param_dict, bestThreshdef updateXGBModel(df, param_dict, model):pandas_df, featureCols, pos_weight = prepTestData(df, param_dict["baseFeatures"], param_dict)pandas_df['label'] = np.where(pandas_df[param_dict["labelField"]] ==param_dict["positiveLabel"], 1, 0)pandas_df = pandas_df.fillna(0)y_train = pandas_df['label'].valuesX_train_pd = pandas_df.drop('label', 1)if len(X_train_pd) > 100000 :X = np.array_split(X_train_pd, 100000)y = np.array_split(y_train, 100000)for i in range(len(X)):xgb_class = XGBClassifier(scale_pos_weight=pos_weight)model = xgb_class.fit(X[i],y[i], xgb_model = model)xgb_model = modelreturn xgb_model, featureCols, param_dict# uses a model to predict valuesdef modelPredict(model, test_df, summary, param_dict, posThresh):test_pdf, feats1 = prepTestData(test_df, summary, param_dict["baseFeatures"], param_dict)for col in param_dict["BASE_FEATURES_TIMESTAMP"]:test_pdf[col] = pd.to_datetime(test_pdf[col], errors='coerce')test_adf = test_pdf.replace([np.inf,-np.inf], 0)x = test_adf[feats1].values #returns a numpy arraystandard_scaler = preprocessing.StandardScaler()x_scaled = standard_scaler.fit_transform(x)test_adf = pd.DataFrame(x_scaled, columns=test_adf[feats1].columns)X_test = test_adf[param_dict["trainedCols"]]result_proba = model.predict_proba(X_test)result = []result = (result_proba[:,1] >= posThresh).astype(int)#result = model.predict(X_test)return result, result_proba

参考文献

/view/529fc3a4a45177232e60a287.html

/p/345828553

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。