【聚类】一种自适应Eps和Minpts的DBSCAN方法(python实现)

论坛 期权论坛 脚本     
匿名技术用户   2020-12-22 06:10   11   0

一、算法来源

1、DBSCAN算法原型

这个算法原型非常简单,有很多博主都有写,大家自己去看看就好了,也不用花太多时间,顶多十分钟就能了解个大概。

2、自适应Eps和Minpts参数

由于该算法对Eps和Minpts参数十分敏感,所以如何确定这两个参数对于DBSCAN来说是很重要的一步,这篇博文是基于李文杰老师的论文《自适应确定DBSCAN算法参数的算法研究》,通过这篇论文,输入数据集即可大致确定这两个参数,从而可以直接在DBSCAN中应用。

二、大致思想

参考论文中提出的,根据数据集提取出Eps候选项(按从小到大排列),然后再提取出Minpts候选项,随后用这些候选项尝试使用DBSCAN算法进行聚类,如果连续的候选项聚类的类别数目相同,那么选择Eps相对较大的那个最为最终参数输入到DBSCAN算法中去。

论文中认为如果连续3个Eps候选项聚类的类别数目相同,那么可以认为数据集在这些参数下逐渐收敛,但是我觉得具体最好看图像是否收敛,所以我就简单的在程序里将聚类数目打印出来,读者可以自行选择聚利时使用的Eps和Minpts参数。

三、程序介绍

0、头文件import

import math
import copy
import numpy as np
from sklearn.cluster import DBSCAN

1、首先定义聚类的“距离”,这里将欧式距离作为聚类距离

def dist(a,b):
    """
    :param a: 样本点
    :param b: 样本点
    :return: 两个样本点之间的欧式距离
    """
    return math.sqrt(math.pow(a[0]-b[0],2) + math.pow(a[1]-b[1],2))

2、根据论文求解Eps和Minpts候选项列表

def returnDk(matrix,k):
    """
    :param matrix: 距离矩阵
    :param k: 第k最近
    :return: 第k最近距离集合
    """
    Dk = []
    for i in range(len(matrix)):
        Dk.append(matrix[i][k])
    return Dk


def returnDkAverage(Dk):
    """
    :param Dk: k-最近距离集合
    :return: Dk的平均值
    """
    sum = 0
    for i in range(len(Dk)):
        sum = sum + Dk[i]
    return sum/len(Dk)


def CalculateDistMatrix(dataset):
    """
    :param dataset: 数据集
    :return: 距离矩阵
    """
    DistMatrix = [[0 for j in range(len(dataset))] for i in range(len(dataset))]
    for i in range(len(dataset)):
        for j in range(len(dataset)):
            DistMatrix[i][j] = dist(dataset[i], dataset[j])
    return DistMatrix


def returnEpsCandidate(dataSet):
    """
    :param dataSet: 数据集
    :return: eps候选集合
    """
    DistMatrix = CalculateDistMatrix(dataSet)
    tmp_matrix = copy.deepcopy(DistMatrix)
    for i in range(len(tmp_matrix)):
        tmp_matrix[i].sort()
    EpsCandidate = []
    for k in range(1,len(dataSet)):
        Dk = returnDk(tmp_matrix,k)
        DkAverage = returnDkAverage(Dk)
        EpsCandidate.append(DkAverage)
    return EpsCandidate


def returnMinptsCandidate(DistMatrix,EpsCandidate):
    """
    :param DistMatrix: 距离矩阵
    :param EpsCandidate: Eps候选列表
    :return: Minpts候选列表
    """
    MinptsCandidate = []
    for k in range(len(EpsCandidate)):
        tmp_eps = EpsCandidate[k]
        tmp_count = 0
        for i in range(len(DistMatrix)):
            for j in range(len(DistMatrix[i])):
                if DistMatrix[i][j] <= tmp_eps:
                    tmp_count = tmp_count + 1
        MinptsCandidate.append(tmp_count/len(dataSet))
    return MinptsCandidate

3、求解出聚类类别数列表

def returnClusterNumberList(dataset,EpsCandidate,MinptsCandidate):
    """
    :param dataset: 数据集
    :param EpsCandidate: Eps候选列表
    :param MinptsCandidate: Minpts候选列表
    :return: 聚类数量列表
    """
    np_dataset = np.array(dataset)  #将dataset转换成numpy_array的形式
    ClusterNumberList = []
    for i in range(len(EpsCandidate)):
        clustering = DBSCAN(eps= EpsCandidate[i],min_samples= MinptsCandidate[i]).fit(np_dataset)
        num_clustering = max(clustering.labels_)
        ClusterNumberList.append(num_clustering)
    return ClusterNumberList

四、程序(完全版)

import math
import copy
import numpy as np
from sklearn.cluster import DBSCAN


def loadDataSet(fileName, splitChar='\t'):
    """
    输入:文件名
    输出:数据集
    描述:从文件读入数据集
    """
    dataSet = []
    with open(fileName) as fr:
        for line in fr.readlines():
            curline = line.strip().split(splitChar)
            fltline = list(map(float, curline))
            dataSet.append(fltline)
    return dataSet


def dist(a,b):
    """
    :param a: 样本点
    :param b: 样本点
    :return: 两个样本点之间的欧式距离
    """
    return math.sqrt(math.pow(a[0]-b[0],2) + math.pow(a[1]-b[1],2))


def returnDk(matrix,k):
    """
    :param matrix: 距离矩阵
    :param k: 第k最近
    :return: 第k最近距离集合
    """
    Dk = []
    for i in range(len(matrix)):
        Dk.append(matrix[i][k])
    return Dk


def returnDkAverage(Dk):
    """
    :param Dk: k-最近距离集合
    :return: Dk的平均值
    """
    sum = 0
    for i in range(len(Dk)):
        sum = sum + Dk[i]
    return sum/len(Dk)


def CalculateDistMatrix(dataset):
    """
    :param dataset: 数据集
    :return: 距离矩阵
    """
    DistMatrix = [[0 for j in range(len(dataset))] for i in range(len(dataset))]
    for i in range(len(dataset)):
        for j in range(len(dataset)):
            DistMatrix[i][j] = dist(dataset[i], dataset[j])
    return DistMatrix


def returnEpsCandidate(dataSet):
    """
    :param dataSet: 数据集
    :return: eps候选集合
    """
    DistMatrix = CalculateDistMatrix(dataSet)
    tmp_matrix = copy.deepcopy(DistMatrix)
    for i in range(len(tmp_matrix)):
        tmp_matrix[i].sort()
    EpsCandidate = []
    for k in range(1,len(dataSet)):
        Dk = returnDk(tmp_matrix,k)
        DkAverage = returnDkAverage(Dk)
        EpsCandidate.append(DkAverage)
    return EpsCandidate


def returnMinptsCandidate(DistMatrix,EpsCandidate):
    """
    :param DistMatrix: 距离矩阵
    :param EpsCandidate: Eps候选列表
    :return: Minpts候选列表
    """
    MinptsCandidate = []
    for k in range(len(EpsCandidate)):
        tmp_eps = EpsCandidate[k]
        tmp_count = 0
        for i in range(len(DistMatrix)):
            for j in range(len(DistMatrix[i])):
                if DistMatrix[i][j] <= tmp_eps:
                    tmp_count = tmp_count + 1
        MinptsCandidate.append(tmp_count/len(dataSet))
    return MinptsCandidate


def returnClusterNumberList(dataset,EpsCandidate,MinptsCandidate):
    """
    :param dataset: 数据集
    :param EpsCandidate: Eps候选列表
    :param MinptsCandidate: Minpts候选列表
    :return: 聚类数量列表
    """
    np_dataset = np.array(dataset)  #将dataset转换成numpy_array的形式
    ClusterNumberList = []
    for i in range(len(EpsCandidate)):
        clustering = DBSCAN(eps= EpsCandidate[i],min_samples= MinptsCandidate[i]).fit(np_dataset)
        num_clustering = max(clustering.labels_)
        ClusterNumberList.append(num_clustering)
    return ClusterNumberList

if __name__ == '__main__':
    dataSet = loadDataSet('788points.txt', splitChar=',')
    EpsCandidate = returnEpsCandidate(dataSet)
    DistMatrix = CalculateDistMatrix(dataSet)
    MinptsCandidate = returnMinptsCandidate(DistMatrix,EpsCandidate)
    ClusterNumberList = returnClusterNumberList(dataSet,EpsCandidate,MinptsCandidate)
    print(EpsCandidate)
    print(MinptsCandidate)
    print('cluster number list is')
    print(ClusterNumberList)

五、相关文件

我会将txt文件和程序放在以下位置:

https://download.csdn.net/download/liyihao17/11125093

https://download.csdn.net/download/liyihao17/11125098

另外我也在github上放了

https://github.com/liyihao17/KANN-DBSCAN

需要的读者可以自行下载,相关论文读者可以自行去知网搜索下载

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP