Commit f2c48ac0 authored by 王燊荣's avatar 王燊荣

第一次提交v1.1.0

parents
!**/src/main/**/target/
!**/src/test/**/target/
batching-ck.iml
.idea/*
.idea
target
target/*
.factorypath
.project
.settings
.classpath
/.idea/
/.mvn/
/mvnw
/mvnw.cmd
/HELP.md
\ No newline at end of file
## ClickHouse批量写入SDK
### (一) 注意事项
#### 1. 必须引入Mybatis-Plus
#### 2. !!#ff0000 不可使用!! yandex.clickhouse 相关yml 和驱动 .若有!!#ff0000 需移除!!,并更换为com.clickhouse
> dynamic-db-yandex-clickhouse-vault.yml ==> ru.yandex.clickhouse.ClickHouseDriver
```xml
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
```
#### 3. 使用 com.clickhouse 相关yml 和驱动
>dynamic-db-clickhouse-vault.yml ==> com.clickhouse.jdbc.ClickHouseDriver
```xml
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
<classifier>all</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
```
#### 4. 时间类型字段 设置 NULL 会写入 1970-01-01 08:00:00 (ClickHouse DateTime 的默认值)
### (二) 使用步骤
####1. 引入Maven包
Mybatis-Plus !!#ff0000 必须!!
```xml
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>建议 3.5.1 以上</version>
</dependency>
```
batching-ck
```xml
<dependency>
<groupId>com.afanticar</groupId>
<artifactId>batching-ck</artifactId>
<version>1.0.0</version>
</dependency>
```
#### 2. IXXService 改为继承 ICkBatchService
![enter image description here](https://www.tapd.cn/tfl/pictures/202308/tapd_37799108_1690965237_504.png)
#### 3. XXServiceImpl 改为继承 CkBatchServiceImpl
![enter image description here](https://www.tapd.cn/tfl/pictures/202308/tapd_37799108_1690965220_190.png)
```
@DS("rawdata-ck") 使用了Mybatis-plus 多数据源. 若无,且有多数据源需求,可自行引用
```
#### 4. 引用实际Service 使用saveBatchRecordsByInput (List<T> records)
![enter image description here](https://www.tapd.cn/tfl/pictures/202308/tapd_37799108_1690959847_715.png)
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.afanticar</groupId>
<artifactId>batching-ck</artifactId>
<version>1.0.1</version>
<packaging>jar</packaging>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<mybatisplus.version>3.5.1</mybatisplus.version>
<clickhouse.jdbc.version>0.4.6</clickhouse.jdbc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<optional>true</optional>
</dependency>
<dependency>
<!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.jdbc.version}</version>
<optional>true</optional>
<!-- use uber jar with all dependencies included, change classifier to http for smaller jar -->
<classifier>all</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 数据库相关 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatisplus.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<!--此名称要和.m2/settings.xml中设置的ID一致-->
<id>afanti</id>
<name>Releases repository</name>
<url>https://nexus.afanticar.cn/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>public</id>
<name>afanticar nexus</name>
<url>https://nexus.afanticar.cn/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!--发布代码Jar插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<showWarnings>true</showWarnings>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.afanticar.service;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
/**
* @author chin
* @contact chenyan@afanticar.com
* @since 2022/8/23/023
*/
public interface ICkBatchService<T> extends IService<T> {
void saveBatchRecordsByInput(List<T> records);
}
\ No newline at end of file
package com.afanticar.service.impl;
import com.afanticar.service.ICkBatchService;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.ibatis.session.SqlSession;
import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.reflect.Field;
import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author chin
* @contact chenyan@afanticar.com
* @since 2022/8/23/023
*/
public class CkBatchServiceImpl<M extends BaseMapper<T>, T> extends ServiceImpl<M, T> implements ICkBatchService<T> {
protected Class<M> mapperClass = this.currentMapperClass();
protected Class<T> entityClass = this.currentModelClass();
@Autowired
private SqlSessionTemplate sqlSessionTemplate;
@Autowired
protected M myBaseMapper;
protected Logger logger = LoggerFactory.getLogger(this.getClass());
private final ConcurrentHashMap<String, String> sqlMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> columnMap = new ConcurrentHashMap<>();
private static final String TABLE_SUFFIX_ALL = "_all";
private static final String METHOD_NAME_KEY = ".saveBatchRecordsByInput";
private static final String INSERT_SQL_FORMAT = "insert into %s select %s from input('%s') ";
private static final String QUERY_SQL_FORMAT = "select name as col_name, `type` as data_type from system.columns where table = '%s' and database = '%s' order by position asc";
@Override
public void saveBatchRecordsByInput(List<T> records) {
String key = this.mapperClass.getSimpleName() + METHOD_NAME_KEY;
TableName tableNameAnn = this.entityClass.getAnnotation(TableName.class);
String tableName = tableNameAnn.value().endsWith(TABLE_SUFFIX_ALL) ? tableNameAnn.value().substring(0, tableNameAnn.value().length() - 4) : tableNameAnn.value();
if (!sqlMap.containsKey(key)) {
buildSql(tableName, key);
}
String mapperSql = sqlMap.get(key);
List<String> fieldNames = columnMap.get(key);
Connection connection = null;
try {
connection = getConnection();
PreparedStatement ps = connection.prepareStatement(mapperSql);
for (T record : records) {
int i = 1;
for (String column : fieldNames) {
Field field;
try {
field = entityClass.getDeclaredField(column);
} catch (NoSuchFieldException e) {
field = entityClass.getSuperclass().getDeclaredField(column);
}
field.setAccessible(true);
Object val = field.get(record);
if (val == null) {
ps.setNull(i, field.getType().equals(Date.class) ? Types.TIMESTAMP : Types.CHAR);
} else if (val instanceof String[]) {
ps.setArray(i, connection.createArrayOf("String", (String[]) val));
} else if (val instanceof Long[]) {
ps.setArray(i, connection.createArrayOf("Long", (Long[]) val));
} else if (val instanceof Integer[]) {
ps.setArray(i, connection.createArrayOf("Integer", (Integer[]) val));
} else if (val instanceof Date) {
//官方推荐LocalDateTime 不推荐 Timestamp
ps.setObject(i, ((Date) val).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime());
} else {
ps.setObject(i, val);
}
i++;
}
ps.addBatch();
}
ps.executeBatch();
ps.clearBatch();
} catch (Exception e) {
throw new RuntimeException("批量写入CK异常:" + e.getMessage(), e);
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
logger.error("关闭CK连接异常:{}", e.getMessage());
}
}
}
private synchronized void buildSql(String tableName, String key) {
Connection connection = null;
StringJoiner columns = null;
StringJoiner columnAndTypes = null;
List<String> columsList = null;
try {
connection = getConnection();
PreparedStatement ps = connection.prepareStatement(String.format(QUERY_SQL_FORMAT, tableName, connection.getSchema()));
ResultSet set = ps.executeQuery();
columns = new StringJoiner(",", "", "");
columnAndTypes = new StringJoiner(",", "", "");
columsList = new ArrayList<>();
while (set.next()) {
String column = set.getString("col_name");
String dataType = set.getString("data_type");
columns.add(column);
columnAndTypes.add(column + " " + dataType);
columsList.add(toHumpString(column));
}
} catch (Exception e) {
logger.error("初始化CK表【{}】批量写入语句异常:{}", tableName, e.getMessage());
throw new RuntimeException("初始化CK表【" + tableName + "】批量写入语句异常:" + e.getMessage(), e);
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
logger.error("关闭CK连接异常:{}", e.getMessage());
}
}
String querySt = String.format(INSERT_SQL_FORMAT, tableName + TABLE_SUFFIX_ALL, columns.toString(), columnAndTypes.toString());
logger.info("初始化CK表【{}】批量写入语句:{}", tableName, querySt);
sqlMap.putIfAbsent(key, querySt);
columnMap.putIfAbsent(key, columsList);
}
private static String toHumpString(String string) {
StringBuilder stringBuilder = new StringBuilder();
String[] str = string.split("_");
for (String string2 : str) {
if (stringBuilder.length() == 0) {
stringBuilder.append(string2);
} else {
stringBuilder.append(string2.substring(0, 1).toUpperCase());
stringBuilder.append(string2.substring(1));
}
}
return stringBuilder.toString();
}
public Connection getConnection() {
Connection conn = null;
try {
SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession();
conn = sqlSession.getConfiguration().getEnvironment().getDataSource().getConnection();
} catch (Exception e) {
logger.error("获取CK连接异常:{}", e.getMessage());
throw new RuntimeException("获取CK连接异常::" + e.getMessage(), e);
}
return conn;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment