保存视频订单到Mysql
CREATE TABLE `video_order` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`money` int(11) DEFAULT NULL,
`title` varchar(32) DEFAULT NULL,
`trade_no` varchar(64) DEFAULT NULL,
`create_time` date DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn;
private PreparedStatement ps;
/**
* 初始化连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open=======");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
}
/**
* 关闭链接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("close=======");
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 执行对应的sql
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(VideoOrder value, Context context) throws Exception {
ps.setInt(1, value.getUserId());
ps.setInt(2, value.getMoney());
ps.setString(3, value.getTitle());
ps.setString(4, value.getTradeNo());
ps.setDate(5, new Date(value.getCreateTime().getTime()));
ps.executeUpdate();
}
}