Merge files using strategies
This commit is contained in:
@@ -1,25 +1,40 @@
|
||||
from sklearn.cluster import DBSCAN, KMeans
|
||||
import numpy as np
|
||||
from dataclasses import dataclass
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Optional
|
||||
|
||||
class DBSCAN_cluster():
|
||||
def __init__(self, eps, min_samples,data):
|
||||
@dataclass
|
||||
class ClusterResult:
|
||||
labels: np.array
|
||||
centers: Optional[np.array]
|
||||
statistics: list[dict[str, Any]]
|
||||
|
||||
|
||||
class Cluster(ABC):
|
||||
@abstractmethod
|
||||
def run(self, data: np.array) -> ClusterResult:
|
||||
pass
|
||||
|
||||
|
||||
class DBSCANCluster(Cluster):
|
||||
def __init__(self, eps: float = 0.5, min_samples: int = 5):
|
||||
self.eps = eps
|
||||
self.min_samples = min_samples
|
||||
self.data = data
|
||||
self.labels = np.array([])
|
||||
|
||||
def run(self):
|
||||
#@typing.override
|
||||
def run(self, data: np.array) -> ClusterResult:
|
||||
dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples)
|
||||
self.labels = dbscan.fit_predict(self.data)
|
||||
return self.labels
|
||||
labels = dbscan.fit_predict(data)
|
||||
return ClusterResult(labels, None, self.get_statistics(data, labels))
|
||||
|
||||
def get_stats(self):
|
||||
unique_labels = np.unique(self.labels)
|
||||
def get_statistics(self, data: np.array, labels: np.array) -> list[dict[str, Any]]:
|
||||
unique_labels = np.unique(labels)
|
||||
stats = []
|
||||
for label in unique_labels:
|
||||
if label == -1:
|
||||
continue
|
||||
cluster_points = self.data[self.labels == label]
|
||||
cluster_points = data[labels == label]
|
||||
num_points = len(cluster_points)
|
||||
density = num_points / (np.max(cluster_points, axis=0) - np.min(cluster_points, axis=0)).prod()
|
||||
stats.append({
|
||||
@@ -27,37 +42,42 @@ class DBSCAN_cluster():
|
||||
"num_points": num_points,
|
||||
"density": density
|
||||
})
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
class KMeans_cluster():
|
||||
def __init__(self, n_clusters, n_init, max_iter, data):
|
||||
def __str__(self) -> str:
|
||||
return "DBScan"
|
||||
|
||||
|
||||
class KMeansCluster(Cluster):
|
||||
def __init__(self, n_clusters: int = 8, n_init: int = 1, max_iter: int = 300):
|
||||
self.n_clusters = n_clusters
|
||||
self.n_init = n_init
|
||||
self.max_iter = max_iter
|
||||
self.data = data
|
||||
self.labels = np.array([])
|
||||
self.centers = []
|
||||
|
||||
def run(self):
|
||||
#@typing.override
|
||||
def run(self, data: np.array) -> ClusterResult:
|
||||
kmeans = KMeans(n_clusters=self.n_clusters, init="random", n_init=self.n_init, max_iter=self.max_iter, random_state=111)
|
||||
self.labels = kmeans.fit_predict(self.data)
|
||||
self.centers = kmeans.cluster_centers_
|
||||
return self.labels
|
||||
labels = kmeans.fit_predict(data)
|
||||
centers = kmeans.cluster_centers_
|
||||
return ClusterResult(labels, centers, self.get_statistics(data, labels, centers))
|
||||
|
||||
|
||||
def get_stats(self):
|
||||
unique_labels = np.unique(self.labels)
|
||||
def get_statistics(self, data: np.array, labels: np.array, centers: np.array) -> list[dict[str, Any]]:
|
||||
unique_labels = np.unique(labels)
|
||||
stats = []
|
||||
|
||||
for label in unique_labels:
|
||||
cluster_points = self.data[self.labels == label]
|
||||
cluster_points = data[labels == label]
|
||||
num_points = len(cluster_points)
|
||||
center = self.centers[label]
|
||||
center = centers[label]
|
||||
stats.append({
|
||||
'cluster': label,
|
||||
'num_points': num_points,
|
||||
'center': center
|
||||
"cluster": label,
|
||||
"num_points": num_points,
|
||||
"center": center,
|
||||
})
|
||||
return stats
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "KMeans"
|
||||
|
||||
|
||||
CLUSTERING_STRATEGIES = [DBSCANCluster(), KMeansCluster()]
|
||||
|
Reference in New Issue
Block a user